Commit 05515f9f authored by Vincent Pelletier's avatar Vincent Pelletier

all: Switch to timezone-aware cryptography object attributes when available

Fixes errors with Python 3.12, which rejects comparison between aware and
naïve datetimes.
parent a5831c79
......@@ -329,7 +329,7 @@ class CertificateAuthority(object):
ca_crt = ca_key_pair['crt']
public_key = csr.public_key()
now = datetime.datetime.utcnow() - DESYNCHORNISATION_TOLERANCE
now = utils.utcnow() - DESYNCHORNISATION_TOLERANCE
builder = x509.CertificateBuilder(
subject_name=template_csr.subject,
issuer_name=ca_crt.subject,
......@@ -510,8 +510,8 @@ class CertificateAuthority(object):
"""
if (
self._ca_key_size is not None and not self._ca_key_pairs_list or (
self._ca_key_pairs_list[-1]['crt'].not_valid_after -
datetime.datetime.utcnow()
utils.getNotValidAfter(self._ca_key_pairs_list[-1]['crt']) -
utils.utcnow()
).total_seconds() / self._crt_life_time.total_seconds() <= 2
# pylint: disable=consider-using-with
) and self._ca_renewal_lock.acquire(False):
......@@ -556,7 +556,7 @@ class CertificateAuthority(object):
),
] + self._ca_extension_list
public_key = private_key.public_key()
now = datetime.datetime.utcnow() - DESYNCHORNISATION_TOLERANCE
now = utils.utcnow() - DESYNCHORNISATION_TOLERANCE
crt_builder = x509.CertificateBuilder(
subject_name=subject,
issuer_name=subject,
......@@ -593,7 +593,7 @@ class CertificateAuthority(object):
backend=_cryptography_backend,
)
self._storage.appendCAKeyPair(
utils.datetime2timestamp(certificate.not_valid_after),
utils.datetime2timestamp(utils.getNotValidAfter(certificate)),
{
'key_pem': utils.dump_privatekey(private_key),
'crt_pem': utils.dump_certificate(certificate),
......@@ -613,9 +613,9 @@ class CertificateAuthority(object):
before its use can start.
"""
self._renewCAIfNeeded()
now = datetime.datetime.utcnow()
now = utils.utcnow()
for key_pair in reversed(self._ca_key_pairs_list):
if key_pair['crt'].not_valid_before + self._crt_life_time < now:
if utils.getNotValidBefore(key_pair['crt']) + self._crt_life_time < now:
# This CA cert is valid for more than one certificate life time,
# we can assume clients to know it (as they had to renew their
# cert at least once since it became available) so we can start
......@@ -634,8 +634,8 @@ class CertificateAuthority(object):
self._renewCAIfNeeded()
# XXX: _loadCAKeyPairList seems a bit too expensive for such simple operation.
# So do out-of-storage self._ca_key_pairs_list pruning only.
now = datetime.datetime.utcnow()
while self._ca_key_pairs_list[0]['crt'].not_valid_after <= now:
now = utils.utcnow()
while utils.getNotValidAfter(self._ca_key_pairs_list[0]['crt']) <= now:
self._ca_key_pairs_list.pop(0)
return utils.dump_certificate(self._ca_key_pairs_list[0]['crt'])
......@@ -697,7 +697,7 @@ class CertificateAuthority(object):
# An entry MUST NOT be removed
# from the CRL until it appears on one regularly scheduled CRL issued
# beyond the revoked certificate's validity period.
crt.not_valid_after + self._crl_life_time,
utils.getNotValidAfter(crt) + self._crl_life_time,
),
)
self._current_crl_dict.clear()
......@@ -719,7 +719,7 @@ class CertificateAuthority(object):
serial=serial,
expiration_date=utils.datetime2timestamp(
max(
x.not_valid_after for x in self.getCACertificateList()
utils.getNotValidAfter(x) for x in self.getCACertificateList()
) + self._crl_life_time,
),
)
......@@ -765,7 +765,7 @@ class CertificateAuthority(object):
"""
Return PEM-encoded certificate revocation lists for all CAs.
"""
now = datetime.datetime.utcnow()
now = utils.utcnow()
result = {}
crl_pem_dict = self._current_crl_dict
self._renewCAIfNeeded()
......
......@@ -84,7 +84,7 @@ class RetryingCaucaseClient(CaucaseClient):
connection.close() # Resets HTTPConnection state machine.
# Note: repr(str(exception)) is nicer than repr(exception), without
# letting non-printable characters through.
next_try = datetime.datetime.utcnow() + datetime.timedelta(seconds=10)
next_try = utils.utcnow() + datetime.timedelta(seconds=10)
print(
u'Got a network error, retrying at %s, %s: %r' % (
next_try.strftime(u'%Y-%m-%d %H:%M:%S +0000'),
......@@ -270,7 +270,7 @@ class CLICaucaseClient(object):
'was not signed by this CA, revoked or otherwise invalid, skipping',
)
continue
if renewal_deadline < old_crt.not_valid_after:
if renewal_deadline < utils.getNotValidAfter(old_crt):
self._print(crt_path, 'did not reach renew threshold, not renewing')
continue
new_key_pem, new_crt_pem = self._client.renewCertificate(
......@@ -653,7 +653,7 @@ def main(argv=None, stdout=sys.stdout, stderr=sys.stderr):
error = client.revokeCRT(error, args.revoke_crt)
updated, error = client.renewCRT(
crt_key_list=args.renew_crt,
renewal_deadline=datetime.datetime.utcnow() + datetime.timedelta(
renewal_deadline=utils.utcnow() + datetime.timedelta(
days=args.threshold,
),
key_len=args.key_len,
......@@ -896,14 +896,14 @@ def updater(argv=None, until=utils.until):
# through.
client.getCertificateSigningRequest(csr_id)
# Still here ? Ok, wait a bit and try again.
until(datetime.datetime.utcnow() + datetime.timedelta(seconds=60))
until(utils.utcnow() + datetime.timedelta(seconds=60))
else:
with open(args.crt, 'ab') as crt_file:
crt_file.write(crt_pem)
updated = True
break
print('Bootstrap done')
next_deadline = datetime.datetime.utcnow()
next_deadline = utils.utcnow()
while True:
print(
'Next wake-up at',
......@@ -938,12 +938,12 @@ def updater(argv=None, until=utils.until):
):
next_deadline = min(
next_deadline,
crl.next_update - crl_threshold,
utils.getNextUpdate(crl) - crl_threshold,
)
if args.crt:
crt_pem, key_pem, key_path = utils.getKeyPair(args.crt, args.key)
crt = utils.load_certificate(crt_pem, ca_crt_list, None)
if crt.not_valid_after - threshold <= now:
if utils.getNotValidAfter(crt) - threshold <= now:
print('Renewing', args.crt)
new_key_pem, new_crt_pem = client.renewCertificate(
old_crt=crt,
......@@ -968,7 +968,7 @@ def updater(argv=None, until=utils.until):
updated = True
next_deadline = min(
next_deadline,
crt.not_valid_after - threshold,
utils.getNotValidAfter(crt) - threshold,
)
next_deadline = max(
next_deadline,
......
......@@ -323,7 +323,7 @@ class CaucaseClient(object):
utils.load_valid_ca_certificate_list(
ca_pem_list=self._ca_crt_pem_list,
),
key=lambda x: x.not_valid_before,
key=utils.getNotValidBefore,
)[-1]
result = []
for entry in json.loads(
......
......@@ -210,7 +210,7 @@ class CaucaseWSGIRequestHandler(WSGIRequestHandler):
- ...but, because of how impractical it is in python to get system current
timezone (including DST considerations), time it always logged in GMT
"""
now = datetime.datetime.utcnow()
now = utils.utcnow()
return now.strftime(
'%d/' + self.monthname[now.month] + '/%Y:%H:%M:%S +0000',
)
......@@ -353,7 +353,7 @@ def getSSLContext(
exists = False
if exists:
if renew or (
old_crt.not_valid_after - threshold_delta < datetime.datetime.utcnow()
utils.getNotValidAfter(old_crt) - threshold_delta < utils.utcnow()
):
new_key = utils.generatePrivateKey(key_len)
new_key_pem = utils.dump_privatekey(new_key)
......@@ -431,11 +431,11 @@ def getSSLContext(
ssl_context.load_cert_chain(server_key_path)
return (
ssl_context,
utils.load_certificate(
utils.getNotValidAfter(utils.load_certificate(
utils.getLeafCertificate(server_key_path),
http_cas_certificate_list,
None,
).not_valid_after - threshold_delta,
)) - threshold_delta,
)
def main(
......@@ -840,7 +840,7 @@ def main(
)
) + backup_period
except ValueError:
next_backup = datetime.datetime.utcnow()
next_backup = utils.utcnow()
next_deadline = min(
next_deadline,
next_backup,
......@@ -1075,7 +1075,7 @@ def manage(argv=None, stdout=sys.stdout):
]
import_ca[component_name] = component_value
import_ca['from'].append(name)
now = utils.datetime2timestamp(datetime.datetime.utcnow())
now = utils.datetime2timestamp(utils.utcnow())
imported = 0
cas_db = SQLite3Storage(
db_path,
......@@ -1092,7 +1092,7 @@ def manage(argv=None, stdout=sys.stdout):
file=stdout,
)
continue
expiration = utils.datetime2timestamp(crt.not_valid_after)
expiration = utils.datetime2timestamp(utils.getNotValidAfter(crt))
if expiration < now:
print('Skipping expired certificate from', found_from, file=stdout)
del import_ca_dict[identifier]
......@@ -1142,7 +1142,7 @@ def manage(argv=None, stdout=sys.stdout):
for x in db.getCAKeyPairList(prune=False)
]
latest_ca_not_after = max(
x.not_valid_after
utils.getNotValidAfter(x)
for x in trusted_ca_crt_set
)
already_revoked_count = revoked_count = 0
......@@ -1155,10 +1155,10 @@ def manage(argv=None, stdout=sys.stdout):
).value.crl_number
if crl_number is None:
crl_number = current_crl_number
crl_last_update = crl.last_update
crl_last_update = utils.getLastUpdate(crl)
else:
crl_number = max(crl_number, current_crl_number)
crl_last_update = max(crl_last_update, crl.last_update)
crl_last_update = max(crl_last_update, utils.getLastUpdate(crl))
for revoked in crl:
try:
db.revoke(
......
......@@ -30,7 +30,7 @@ from time import time
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from .exceptions import NoStorage, NotFound, Found
from .utils import toBytes, toUnicode, datetime2timestamp
from .utils import toBytes, toUnicode, datetime2timestamp, getLastUpdate
__all__ = ('SQLite3Storage', )
......@@ -164,10 +164,16 @@ class SQLite3Storage(local):
if crl_row is not None:
self._setConfig(
'crl_last_update',
str(datetime2timestamp(x509.load_pem_x509_crl(
str(
datetime2timestamp(
getLastUpdate(
x509.load_pem_x509_crl(
toBytes(crl_row['crl']),
_cryptography_backend,
).last_update)),
),
),
),
),
)
self._execute(c, 'DROP TABLE IF EXISTS %(prefix)scrl')
......
......@@ -361,7 +361,7 @@ class UntilEvent(object):
- if self.action is ON_EVENT_RAISE, raises utils.SleepInterrupt.
- if self.action is ON_EVENT_EXPIRE, deadline time it returned.
"""
now = datetime.datetime.utcnow()
now = utils.utcnow()
if now < deadline:
self._deadline = deadline
self._wait_event.set()
......@@ -492,7 +492,7 @@ class CaucaseTest(TestCase):
Build a reasonably-realistic CA, return key & self-signed cert.
"""
if not_before is None: # pragma: no cover
not_before = datetime.datetime.utcnow()
not_before = utils.utcnow()
if not_after is None: # pragma: no cover
not_after = not_before + datetime.timedelta(days=10)
private_key = utils.generatePrivateKey(2048)
......@@ -559,7 +559,7 @@ class CaucaseTest(TestCase):
Build a reasonably-realistic signed cert, return key & self-signed cert.
"""
if not_before is None: # pragma: no cover
not_before = datetime.datetime.utcnow()
not_before = utils.utcnow()
if not_after is None: # pragma: no cover
not_after = not_before + datetime.timedelta(days=10)
crt_key = utils.generatePrivateKey(2048)
......@@ -607,7 +607,7 @@ class CaucaseTest(TestCase):
next_update=None,
):
if last_update is None: # pragma: no cover
last_update = datetime.datetime.utcnow()
last_update = utils.utcnow()
if next_update is None: # pragma: no cover
next_update = last_update + datetime.timedelta(days=5)
return x509.CertificateRevocationListBuilder(
......@@ -782,12 +782,12 @@ class CaucaseTest(TestCase):
not_valid_after dates so that its remaining validity period
becomes <delta> and its validity span stays unchanged.
"""
new_not_valid_after_date = datetime.datetime.utcnow() + delta
new_not_valid_after_date = utils.utcnow() + delta
return x509.CertificateBuilder(
subject_name=crt.subject,
issuer_name=crt.issuer,
not_valid_before=new_not_valid_after_date - (
crt.not_valid_after - crt.not_valid_before
utils.getNotValidAfter(crt) - utils.getNotValidBefore(crt)
),
not_valid_after=new_not_valid_after_date,
serial_number=crt.serial_number,
......@@ -833,7 +833,7 @@ class CaucaseTest(TestCase):
'expiration_date=?, crt=? '
'WHERE rowid=?',
(
utils.datetime2timestamp(new_crt.not_valid_after),
utils.datetime2timestamp(utils.getNotValidAfter(new_crt)),
new_crt_pem,
row['rowid'],
),
......@@ -1267,11 +1267,11 @@ class CaucaseTest(TestCase):
# Renewing CRL
self._stopServer()
reference_crl, = self._getClientCRLList()
now = datetime.datetime.utcnow()
now = utils.utcnow()
# x509 certificates have second-level accuracy
now = now.replace(microsecond=0)
# Sanity check: pre-existing CRL creation should be strictly in the past
self.assertLess(reference_crl.last_update, now)
self.assertLess(utils.getLastUpdate(reference_crl), now)
# Bump last_update in the past by a bit over half the CRL lifetime.
SQLite3Storage(
self._server_db,
......@@ -1285,7 +1285,7 @@ class CaucaseTest(TestCase):
# May be equal due to lack of timestamp accuracy.
self.assertLessEqual(
now - caucase.ca.DESYNCHORNISATION_TOLERANCE,
new_crl.last_update,
utils.getLastUpdate(new_crl),
)
def testBadCSR(self):
......@@ -1733,7 +1733,8 @@ class CaucaseTest(TestCase):
new_cau_pem = self._setCACertificateRemainingLifeTime(
'user',
new_cau_crt.serial_number,
new_cau_crt.not_valid_after - new_cau_crt.not_valid_before -
utils.getNotValidAfter(new_cau_crt) -
utils.getNotValidBefore(new_cau_crt) -
datetime.timedelta(days=100),
)
utils.saveCertList(
......@@ -1911,7 +1912,7 @@ class CaucaseTest(TestCase):
) in list(crl_pem_dict.items()):
crl_pem_dict[authority_key_identifier] = (
crl_pem,
datetime.datetime.utcnow() - datetime.timedelta(seconds=1),
utils.utcnow() - datetime.timedelta(seconds=1),
)
cau_ca_list = cau.getCACertificateList()
new_crl_pem_dict = cau.getCertificateRevocationListDict()
......@@ -3013,7 +3014,7 @@ class CaucaseTest(TestCase):
crl.extensions.get_extension_for_class(
x509.CRLNumber,
).value.crl_number,
crl.last_update.isoformat(' '),
utils.getLastUpdate(crl).isoformat(' '),
),
'Revoked 1 certificates (1 were already revoked)',
],
......@@ -3328,7 +3329,7 @@ class CaucaseTest(TestCase):
# Next wakeup should be 7 days before CRL expiration (default delay)
crl, = self._getClientCRLList()
crl_renewal = crl.next_update - datetime.timedelta(days=7)
crl_renewal = utils.getNextUpdate(crl) - datetime.timedelta(days=7)
# Give +/-5 seconds of leeway.
crl_tolerance = datetime.timedelta(seconds=5)
self.assertGreater(
......@@ -3699,7 +3700,7 @@ class CaucaseTest(TestCase):
value TEXT
);
''')
now = datetime.datetime.utcnow().replace(microsecond=0)
now = utils.utcnow().replace(microsecond=0)
ca_lifetime = datetime.timedelta(days=390)
old_ca_expiration_date = now + datetime.timedelta(days=100)
old_ca_key, old_ca_crt = self._getCAKeyPair(
......@@ -3821,7 +3822,7 @@ class CaucaseTest(TestCase):
# (by at least one second) although this is not being tested, and the
# re-generated CRL time should be "now" to justify the crl_number being
# unchanged.
self.assertEqual(crl.last_update, now)
self.assertEqual(utils.getLastUpdate(crl), now)
self.assertEqual(
crl.extensions.get_extension_for_class(
x509.CRLNumber,
......@@ -3831,7 +3832,7 @@ class CaucaseTest(TestCase):
self.assertItemsEqual(
revoked_list,
[
(x.serial_number, x.revocation_date)
(x.serial_number, utils.getRevocationDate(x))
for x in crl
],
)
......
......@@ -652,7 +652,92 @@ def _getAuthorityKeyIdentifier(cert):
x509.AuthorityKeyIdentifier,
).value.key_identifier
EPOCH = datetime.datetime(1970, 1, 1)
try:
_UTC = datetime.timezone.utc
except AttributeError: # pragma: no cover
# BBB
_ZERO_TIMEDELTA = datetime.timedelta(0)
class _UTC(datetime.tzinfo):
"""UTC"""
def utcoffset(self, dt):
return _ZERO_TIMEDELTA
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return _ZERO_TIMEDELTA
def utcnow():
"""
Get a datetime instance of the current time in UTC.
"""
return datetime.datetime.now(_UTC)
if hasattr(x509.Certificate, 'not_valid_after_utc'): # pragma: no cover
def getNotValidAfter(obj):
"""
Return the not_valid_after date from given cyrptography object.
"""
return obj.not_valid_after_utc
def getNotValidBefore(obj):
"""
Return the not_valid_before date from given cyrptography object.
"""
return obj.not_valid_before_utc
def getLastUpdate(obj):
"""
Return the last_update date from given cyrptography object.
"""
return obj.last_update_utc
def getNextUpdate(obj):
"""
Return the next_update date from given cyrptography object.
"""
return obj.next_update_utc
def getRevocationDate(obj):
"""
Return the revocation_date from given cyrptography object.
"""
return obj.revocation_date_utc
else: # pragma: no cover
# BBB
def getNotValidAfter(obj):
"""
Return the not_valid_after date from given cyrptography object.
"""
return obj.not_valid_after
def getNotValidBefore(obj):
"""
Return the not_valid_before date from given cyrptography object.
"""
return obj.not_valid_before
def getLastUpdate(obj):
"""
Return the last_update date from given cyrptography object.
"""
return obj.last_update
def getNextUpdate(obj):
"""
Return the next_update date from given cyrptography object.
"""
return obj.next_update
def getRevocationDate(obj):
"""
Return the revocation_date from given cyrptography object.
"""
return obj.revocation_date
EPOCH = datetime.datetime(1970, 1, 1, tzinfo=_UTC)
def datetime2timestamp(value):
"""
Convert given datetime into a unix timestamp.
......@@ -731,7 +816,7 @@ def until(deadline): # pragma: no cover
"""
try:
while True:
now = datetime.datetime.utcnow()
now = utcnow()
remaining_time = (deadline - now).total_seconds()
if remaining_time <= 0:
break
......@@ -765,7 +850,7 @@ def log_exception(error_file, exc_info, client_address):
try:
print(
'[%s] [pid %s:tid %s] [client %s] %s %s%s' % (
datetime.datetime.utcnow().isoformat(),
utcnow().isoformat(),
os.getpid(),
threading.current_thread().ident,
client_address,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment