-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for SSH certificates using ecdsa-sk or ed25519-sk public keys #813
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
from __future__ import absolute_import, division, print_function | ||
|
||
__metaclass__ = type | ||
|
||
# Protocol References | ||
|
@@ -14,6 +15,7 @@ | |
# https://datatracker.ietf.org/doc/html/rfc5656 | ||
# https://datatracker.ietf.org/doc/html/rfc8032 | ||
# https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD | ||
# https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.u2f?annotate=HEAD | ||
# | ||
# Inspired by: | ||
# ------------ | ||
|
@@ -52,8 +54,13 @@ | |
'ecdsa-nistp384': b"ecdsa-sha2-nistp384", | ||
'ecdsa-nistp521': b"ecdsa-sha2-nistp521", | ||
'ed25519': b"ssh-ed25519", | ||
# FIDO2 hardware keys | ||
'ecdsa-sk': b"sk-ecdsa-sha2-nistp256", | ||
'ed25519-sk': b"sk-ssh-ed25519", | ||
} | ||
_CERT_SUFFIX_V01 = b"[email protected]" | ||
_SK_SUFFIX = b"@openssh.com" | ||
_SK_PREFIX = b"sk-" | ||
|
||
# See https://datatracker.ietf.org/doc/html/rfc5656#section-6.1 | ||
_ECDSA_CURVE_IDENTIFIERS = { | ||
|
@@ -69,7 +76,6 @@ | |
|
||
_USE_TIMEZONE = sys.version_info >= (3, 6) | ||
|
||
|
||
_ALWAYS = _add_or_remove_timezone(datetime(1970, 1, 1), with_timezone=_USE_TIMEZONE) | ||
_FOREVER = datetime(9999, 12, 31, 23, 59, 59, 999999, _UTC) if _USE_TIMEZONE else datetime.max | ||
|
||
|
@@ -145,7 +151,8 @@ def format_datetime(dt, date_format): | |
elif dt == _FOREVER: | ||
result = 'forever' | ||
else: | ||
result = dt.isoformat().replace('+00:00', '') if date_format == 'human_readable' else dt.strftime("%Y%m%d%H%M%S") | ||
result = dt.isoformat().replace('+00:00', '') if date_format == 'human_readable' else dt.strftime( | ||
"%Y%m%d%H%M%S") | ||
elif date_format == 'timestamp': | ||
td = dt - _ALWAYS | ||
result = int((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6) | ||
|
@@ -196,7 +203,8 @@ def _time_string_to_datetime(time_string): | |
else: | ||
for time_format in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): | ||
try: | ||
result = _add_or_remove_timezone(datetime.strptime(time_string, time_format), with_timezone=_USE_TIMEZONE) | ||
result = _add_or_remove_timezone(datetime.strptime(time_string, time_format), | ||
with_timezone=_USE_TIMEZONE) | ||
except ValueError: | ||
pass | ||
if result is None: | ||
|
@@ -279,6 +287,7 @@ def from_string(cls, option_string): | |
@six.add_metaclass(abc.ABCMeta) | ||
class OpensshCertificateInfo: | ||
"""Encapsulates all certificate information which is signed by a CA key""" | ||
|
||
def __init__(self, | ||
nonce=None, | ||
serial=None, | ||
|
@@ -304,6 +313,7 @@ def __init__(self, | |
self.signing_key = signing_key | ||
|
||
self.type_string = None | ||
self.public_key_type_string = None | ||
|
||
@property | ||
def cert_type(self): | ||
|
@@ -326,8 +336,17 @@ def cert_type(self, cert_type): | |
def signing_key_fingerprint(self): | ||
return fingerprint(self.signing_key) | ||
|
||
@abc.abstractmethod | ||
def public_key_fingerprint(self): | ||
if self.public_key_type_string is None: | ||
return b'' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Returning There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
writer = _OpensshWriter() | ||
|
||
writer.string(self.public_key_type_string) | ||
self.write_public_key_params(writer) | ||
return fingerprint(writer.bytes()) | ||
|
||
@abc.abstractmethod | ||
def write_public_key_params(self, writer): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a breaking API change. You need to remove Also you should add a docstring here explaining what this function is expected to do. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jnohlgard I think it would be better to just introduce the feature in this PR and move the refactor to a follow up if necessary. As @felixfontein is indicating you would force this feature to wait until the next major release if it includes breaking changes. |
||
pass | ||
|
||
@abc.abstractmethod | ||
|
@@ -339,21 +358,18 @@ class OpensshRSACertificateInfo(OpensshCertificateInfo): | |
def __init__(self, e=None, n=None, **kwargs): | ||
super(OpensshRSACertificateInfo, self).__init__(**kwargs) | ||
self.type_string = _SSH_TYPE_STRINGS['rsa'] + _CERT_SUFFIX_V01 | ||
self.public_key_type_string = _SSH_TYPE_STRINGS['rsa'] | ||
self.e = e | ||
self.n = n | ||
|
||
# See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 | ||
def public_key_fingerprint(self): | ||
def write_public_key_params(self, writer): | ||
if any([self.e is None, self.n is None]): | ||
return b'' | ||
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Returning early here without throwing an exception or any indication of missing data will result in an invalid fingerprint now instead of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same goes for the other concrete implementations. |
||
|
||
writer = _OpensshWriter() | ||
writer.string(_SSH_TYPE_STRINGS['rsa']) | ||
writer.mpint(self.e) | ||
writer.mpint(self.n) | ||
|
||
return fingerprint(writer.bytes()) | ||
|
||
def parse_public_numbers(self, parser): | ||
self.e = parser.mpint() | ||
self.n = parser.mpint() | ||
|
@@ -363,25 +379,22 @@ class OpensshDSACertificateInfo(OpensshCertificateInfo): | |
def __init__(self, p=None, q=None, g=None, y=None, **kwargs): | ||
super(OpensshDSACertificateInfo, self).__init__(**kwargs) | ||
self.type_string = _SSH_TYPE_STRINGS['dsa'] + _CERT_SUFFIX_V01 | ||
self.public_key_type_string = _SSH_TYPE_STRINGS['dsa'] | ||
self.p = p | ||
self.q = q | ||
self.g = g | ||
self.y = y | ||
|
||
# See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 | ||
def public_key_fingerprint(self): | ||
def write_public_key_params(self, writer): | ||
if any([self.p is None, self.q is None, self.g is None, self.y is None]): | ||
return b'' | ||
return | ||
|
||
writer = _OpensshWriter() | ||
writer.string(_SSH_TYPE_STRINGS['dsa']) | ||
writer.mpint(self.p) | ||
writer.mpint(self.q) | ||
writer.mpint(self.g) | ||
writer.mpint(self.y) | ||
|
||
return fingerprint(writer.bytes()) | ||
|
||
def parse_public_numbers(self, parser): | ||
self.p = parser.mpint() | ||
self.q = parser.mpint() | ||
|
@@ -406,24 +419,21 @@ def curve(self): | |
def curve(self, curve): | ||
if curve in _ECDSA_CURVE_IDENTIFIERS.values(): | ||
self._curve = curve | ||
self.public_key_type_string = _SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[curve]] | ||
self.type_string = _SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[curve]] + _CERT_SUFFIX_V01 | ||
else: | ||
raise ValueError( | ||
"Curve must be one of %s" % (b','.join(list(_ECDSA_CURVE_IDENTIFIERS.values()))).decode('UTF-8') | ||
) | ||
|
||
# See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 | ||
def public_key_fingerprint(self): | ||
def write_public_key_params(self, writer): | ||
if any([self.curve is None, self.public_key is None]): | ||
return b'' | ||
return | ||
|
||
writer = _OpensshWriter() | ||
writer.string(_SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[self.curve]]) | ||
writer.string(self.curve) | ||
writer.string(self.public_key) | ||
|
||
return fingerprint(writer.bytes()) | ||
|
||
def parse_public_numbers(self, parser): | ||
self.curve = parser.string() | ||
self.public_key = parser.string() | ||
|
@@ -433,25 +443,74 @@ class OpensshED25519CertificateInfo(OpensshCertificateInfo): | |
def __init__(self, pk=None, **kwargs): | ||
super(OpensshED25519CertificateInfo, self).__init__(**kwargs) | ||
self.type_string = _SSH_TYPE_STRINGS['ed25519'] + _CERT_SUFFIX_V01 | ||
self.public_key_type_string = _SSH_TYPE_STRINGS['ed25519'] | ||
self.pk = pk | ||
|
||
def public_key_fingerprint(self): | ||
def write_public_key_params(self, writer): | ||
if self.pk is None: | ||
return b'' | ||
|
||
writer = _OpensshWriter() | ||
writer.string(_SSH_TYPE_STRINGS['ed25519']) | ||
return | ||
writer.string(self.pk) | ||
|
||
return fingerprint(writer.bytes()) | ||
|
||
def parse_public_numbers(self, parser): | ||
self.pk = parser.string() | ||
|
||
|
||
class OpensshSKECDSACertificateInfo(OpensshECDSACertificateInfo): | ||
def __init__(self, application=b"ssh:", curve=None, **kwargs): | ||
if curve is None: | ||
curve = b'nistp256' | ||
super(OpensshSKECDSACertificateInfo, self).__init__(curve=curve, **kwargs) | ||
|
||
self.type_string = _SSH_TYPE_STRINGS['ecdsa-sk'] + _CERT_SUFFIX_V01 | ||
self.public_key_type_string = _SSH_TYPE_STRINGS['ecdsa-sk'] + _SK_SUFFIX | ||
self.application = application | ||
|
||
def write_public_key_params(self, writer): | ||
if self.application is None: | ||
return | ||
|
||
super(OpensshSKECDSACertificateInfo, self).write_public_key_params(writer) | ||
|
||
writer.string(self.application) | ||
|
||
def parse_public_numbers(self, parser): | ||
super(OpensshSKECDSACertificateInfo, self).parse_public_numbers(parser) | ||
|
||
self.application = parser.string() | ||
|
||
@OpensshECDSACertificateInfo.curve.setter | ||
def curve(self, curve): | ||
if curve != b'nistp256': | ||
raise ValueError( | ||
"ecdsa-sk only supports curve nistp256" | ||
) | ||
self._curve = curve | ||
|
||
class OpensshSKED25519CertificateInfo(OpensshED25519CertificateInfo): | ||
def __init__(self, application=b"ssh:", **kwargs): | ||
super(OpensshSKED25519CertificateInfo, self).__init__(**kwargs) | ||
self.type_string = _SSH_TYPE_STRINGS['ed25519-sk'] + _CERT_SUFFIX_V01 | ||
self.public_key_type_string = _SSH_TYPE_STRINGS['ed25519-sk'] + _SK_SUFFIX | ||
self.application = application | ||
|
||
def write_public_key_params(self, writer): | ||
if self.application is None: | ||
return | ||
|
||
super(OpensshSKED25519CertificateInfo, self).write_public_key_params(writer) | ||
|
||
writer.string(self.application) | ||
|
||
def parse_public_numbers(self, parser): | ||
super(OpensshSKED25519CertificateInfo, self).parse_public_numbers(parser) | ||
|
||
self.application = parser.string() | ||
|
||
|
||
# See https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD | ||
class OpensshCertificate(object): | ||
"""Encapsulates a formatted OpenSSH certificate including signature and signing key""" | ||
|
||
def __init__(self, cert_info, signature): | ||
|
||
self._cert_info = cert_info | ||
|
@@ -635,10 +694,14 @@ def get_cert_info_object(key_type): | |
cert_info = OpensshRSACertificateInfo() | ||
elif key_type == 'dsa': | ||
cert_info = OpensshDSACertificateInfo() | ||
elif key_type in ('ecdsa-nistp256', 'ecdsa-nistp384', 'ecdsa-nistp521'): | ||
elif key_type in _ECDSA_CURVE_IDENTIFIERS: | ||
cert_info = OpensshECDSACertificateInfo() | ||
elif key_type == 'ed25519': | ||
cert_info = OpensshED25519CertificateInfo() | ||
elif key_type == 'ecdsa-sk': | ||
cert_info = OpensshSKECDSACertificateInfo() | ||
elif key_type == 'ed25519-sk': | ||
cert_info = OpensshSKED25519CertificateInfo() | ||
else: | ||
raise ValueError("%s is not a valid key type" % key_type) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,6 +87,7 @@ def secure_write(path, mode, content): | |
class OpensshParser(object): | ||
"""Parser for OpenSSH encoded objects""" | ||
BOOLEAN_OFFSET = 1 | ||
UINT8_OFFSET = 1 | ||
UINT32_OFFSET = 4 | ||
UINT64_OFFSET = 8 | ||
|
||
|
@@ -104,6 +105,13 @@ def boolean(self): | |
self._pos = next_pos | ||
return value | ||
|
||
def uint8(self): | ||
next_pos = self._check_position(self.UINT8_OFFSET) | ||
|
||
value = _UBYTE.unpack(self._data[self._pos:next_pos])[0] | ||
self._pos = next_pos | ||
return value | ||
|
||
def uint32(self): | ||
next_pos = self._check_position(self.UINT32_OFFSET) | ||
|
||
|
@@ -189,7 +197,6 @@ def signature_data(cls, signature_string): | |
signature_type = parser.string() | ||
signature_blob = parser.string() | ||
|
||
blob_parser = cls(signature_blob) | ||
if signature_type in (b'ssh-rsa', b'rsa-sha2-256', b'rsa-sha2-512'): | ||
# https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 | ||
# https://datatracker.ietf.org/doc/html/rfc8332#section-3 | ||
|
@@ -198,17 +205,22 @@ def signature_data(cls, signature_string): | |
# https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 | ||
signature_data['r'] = cls._big_int(signature_blob[:20], "big") | ||
signature_data['s'] = cls._big_int(signature_blob[20:], "big") | ||
elif signature_type in (b'ecdsa-sha2-nistp256', b'ecdsa-sha2-nistp384', b'ecdsa-sha2-nistp521'): | ||
elif signature_type in (b'ecdsa-sha2-nistp256', b'ecdsa-sha2-nistp384', b'ecdsa-sha2-nistp521', b'[email protected]'): | ||
# https://datatracker.ietf.org/doc/html/rfc5656#section-3.1.2 | ||
blob_parser = cls(signature_blob) | ||
signature_data['r'] = blob_parser.mpint() | ||
signature_data['s'] = blob_parser.mpint() | ||
elif signature_type == b'ssh-ed25519': | ||
elif signature_type in (b'ssh-ed25519', b'[email protected]'): | ||
# https://datatracker.ietf.org/doc/html/rfc8032#section-5.1.2 | ||
signature_data['R'] = cls._big_int(signature_blob[:32], "little") | ||
signature_data['S'] = cls._big_int(signature_blob[32:], "little") | ||
else: | ||
raise ValueError("%s is not a valid signature type" % signature_type) | ||
|
||
if signature_type.startswith(b'sk-'): | ||
signature_data['signature_flags'] = parser.uint8() | ||
signature_data['signature_counter'] = parser.uint32() | ||
|
||
signature_data['signature_type'] = signature_type | ||
|
||
return signature_data | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind undoing reformatting changes to unrelated code?