Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for SSH certificates using ecdsa-sk or ed25519-sk public keys #813

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 93 additions & 30 deletions plugins/module_utils/openssh/certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import absolute_import, division, print_function

__metaclass__ = type

# Protocol References
Expand All @@ -14,6 +15,7 @@
# https://datatracker.ietf.org/doc/html/rfc5656
# https://datatracker.ietf.org/doc/html/rfc8032
# https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD
# https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.u2f?annotate=HEAD
#
# Inspired by:
# ------------
Expand Down Expand Up @@ -52,8 +54,13 @@
'ecdsa-nistp384': b"ecdsa-sha2-nistp384",
'ecdsa-nistp521': b"ecdsa-sha2-nistp521",
'ed25519': b"ssh-ed25519",
# FIDO2 hardware keys
'ecdsa-sk': b"sk-ecdsa-sha2-nistp256",
'ed25519-sk': b"sk-ssh-ed25519",
}
_CERT_SUFFIX_V01 = b"[email protected]"
_SK_SUFFIX = b"@openssh.com"
_SK_PREFIX = b"sk-"

# See https://datatracker.ietf.org/doc/html/rfc5656#section-6.1
_ECDSA_CURVE_IDENTIFIERS = {
Expand All @@ -69,7 +76,6 @@

_USE_TIMEZONE = sys.version_info >= (3, 6)


_ALWAYS = _add_or_remove_timezone(datetime(1970, 1, 1), with_timezone=_USE_TIMEZONE)
_FOREVER = datetime(9999, 12, 31, 23, 59, 59, 999999, _UTC) if _USE_TIMEZONE else datetime.max

Expand Down Expand Up @@ -145,7 +151,8 @@ def format_datetime(dt, date_format):
elif dt == _FOREVER:
result = 'forever'
else:
result = dt.isoformat().replace('+00:00', '') if date_format == 'human_readable' else dt.strftime("%Y%m%d%H%M%S")
result = dt.isoformat().replace('+00:00', '') if date_format == 'human_readable' else dt.strftime(
"%Y%m%d%H%M%S")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind undoing reformatting changes to unrelated code?

elif date_format == 'timestamp':
td = dt - _ALWAYS
result = int((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6)
Expand Down Expand Up @@ -196,7 +203,8 @@ def _time_string_to_datetime(time_string):
else:
for time_format in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
result = _add_or_remove_timezone(datetime.strptime(time_string, time_format), with_timezone=_USE_TIMEZONE)
result = _add_or_remove_timezone(datetime.strptime(time_string, time_format),
with_timezone=_USE_TIMEZONE)
except ValueError:
pass
if result is None:
Expand Down Expand Up @@ -279,6 +287,7 @@ def from_string(cls, option_string):
@six.add_metaclass(abc.ABCMeta)
class OpensshCertificateInfo:
"""Encapsulates all certificate information which is signed by a CA key"""

def __init__(self,
nonce=None,
serial=None,
Expand All @@ -304,6 +313,7 @@ def __init__(self,
self.signing_key = signing_key

self.type_string = None
self.public_key_type_string = None

@property
def cert_type(self):
Expand All @@ -326,8 +336,17 @@ def cert_type(self, cert_type):
def signing_key_fingerprint(self):
return fingerprint(self.signing_key)

@abc.abstractmethod
def public_key_fingerprint(self):
if self.public_key_type_string is None:
return b''
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning b'' instead of None is also an API breakage, I would add a docstring that specifies that None will change to b'' in community.crypto 3.0.0.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b'' is actually consistent with the existing behavior.self.public_key_type_string is a new property added with this refactor which is why there is a None check now.

writer = _OpensshWriter()

writer.string(self.public_key_type_string)
self.write_public_key_params(writer)
return fingerprint(writer.bytes())

@abc.abstractmethod
def write_public_key_params(self, writer):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking API change. You need to remove @abc.abstractmethod and add in a docstring that in community.crypto 3.0.0 this will become an abstract method.

Also you should add a docstring here explaining what this function is expected to do.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jnohlgard I think it would be better to just introduce the feature in this PR and move the refactor to a follow up if necessary. As @felixfontein is indicating you would force this feature to wait until the next major release if it includes breaking changes.

pass

@abc.abstractmethod
Expand All @@ -339,21 +358,18 @@ class OpensshRSACertificateInfo(OpensshCertificateInfo):
def __init__(self, e=None, n=None, **kwargs):
super(OpensshRSACertificateInfo, self).__init__(**kwargs)
self.type_string = _SSH_TYPE_STRINGS['rsa'] + _CERT_SUFFIX_V01
self.public_key_type_string = _SSH_TYPE_STRINGS['rsa']
self.e = e
self.n = n

# See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
def public_key_fingerprint(self):
def write_public_key_params(self, writer):
if any([self.e is None, self.n is None]):
return b''
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning early here without throwing an exception or any indication of missing data will result in an invalid fingerprint now instead of b''. So if you really want this refactor I would throw an exception and catch it in public_key_fingerprint to handle the error properly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same goes for the other concrete implementations.


writer = _OpensshWriter()
writer.string(_SSH_TYPE_STRINGS['rsa'])
writer.mpint(self.e)
writer.mpint(self.n)

return fingerprint(writer.bytes())

def parse_public_numbers(self, parser):
self.e = parser.mpint()
self.n = parser.mpint()
Expand All @@ -363,25 +379,22 @@ class OpensshDSACertificateInfo(OpensshCertificateInfo):
def __init__(self, p=None, q=None, g=None, y=None, **kwargs):
super(OpensshDSACertificateInfo, self).__init__(**kwargs)
self.type_string = _SSH_TYPE_STRINGS['dsa'] + _CERT_SUFFIX_V01
self.public_key_type_string = _SSH_TYPE_STRINGS['dsa']
self.p = p
self.q = q
self.g = g
self.y = y

# See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
def public_key_fingerprint(self):
def write_public_key_params(self, writer):
if any([self.p is None, self.q is None, self.g is None, self.y is None]):
return b''
return

writer = _OpensshWriter()
writer.string(_SSH_TYPE_STRINGS['dsa'])
writer.mpint(self.p)
writer.mpint(self.q)
writer.mpint(self.g)
writer.mpint(self.y)

return fingerprint(writer.bytes())

def parse_public_numbers(self, parser):
self.p = parser.mpint()
self.q = parser.mpint()
Expand All @@ -406,24 +419,21 @@ def curve(self):
def curve(self, curve):
if curve in _ECDSA_CURVE_IDENTIFIERS.values():
self._curve = curve
self.public_key_type_string = _SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[curve]]
self.type_string = _SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[curve]] + _CERT_SUFFIX_V01
else:
raise ValueError(
"Curve must be one of %s" % (b','.join(list(_ECDSA_CURVE_IDENTIFIERS.values()))).decode('UTF-8')
)

# See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
def public_key_fingerprint(self):
def write_public_key_params(self, writer):
if any([self.curve is None, self.public_key is None]):
return b''
return

writer = _OpensshWriter()
writer.string(_SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[self.curve]])
writer.string(self.curve)
writer.string(self.public_key)

return fingerprint(writer.bytes())

def parse_public_numbers(self, parser):
self.curve = parser.string()
self.public_key = parser.string()
Expand All @@ -433,25 +443,74 @@ class OpensshED25519CertificateInfo(OpensshCertificateInfo):
def __init__(self, pk=None, **kwargs):
super(OpensshED25519CertificateInfo, self).__init__(**kwargs)
self.type_string = _SSH_TYPE_STRINGS['ed25519'] + _CERT_SUFFIX_V01
self.public_key_type_string = _SSH_TYPE_STRINGS['ed25519']
self.pk = pk

def public_key_fingerprint(self):
def write_public_key_params(self, writer):
if self.pk is None:
return b''

writer = _OpensshWriter()
writer.string(_SSH_TYPE_STRINGS['ed25519'])
return
writer.string(self.pk)

return fingerprint(writer.bytes())

def parse_public_numbers(self, parser):
self.pk = parser.string()


class OpensshSKECDSACertificateInfo(OpensshECDSACertificateInfo):
def __init__(self, application=b"ssh:", curve=None, **kwargs):
if curve is None:
curve = b'nistp256'
super(OpensshSKECDSACertificateInfo, self).__init__(curve=curve, **kwargs)

self.type_string = _SSH_TYPE_STRINGS['ecdsa-sk'] + _CERT_SUFFIX_V01
self.public_key_type_string = _SSH_TYPE_STRINGS['ecdsa-sk'] + _SK_SUFFIX
self.application = application

def write_public_key_params(self, writer):
if self.application is None:
return

super(OpensshSKECDSACertificateInfo, self).write_public_key_params(writer)

writer.string(self.application)

def parse_public_numbers(self, parser):
super(OpensshSKECDSACertificateInfo, self).parse_public_numbers(parser)

self.application = parser.string()

@OpensshECDSACertificateInfo.curve.setter
def curve(self, curve):
if curve != b'nistp256':
raise ValueError(
"ecdsa-sk only supports curve nistp256"
)
self._curve = curve

class OpensshSKED25519CertificateInfo(OpensshED25519CertificateInfo):
def __init__(self, application=b"ssh:", **kwargs):
super(OpensshSKED25519CertificateInfo, self).__init__(**kwargs)
self.type_string = _SSH_TYPE_STRINGS['ed25519-sk'] + _CERT_SUFFIX_V01
self.public_key_type_string = _SSH_TYPE_STRINGS['ed25519-sk'] + _SK_SUFFIX
self.application = application

def write_public_key_params(self, writer):
if self.application is None:
return

super(OpensshSKED25519CertificateInfo, self).write_public_key_params(writer)

writer.string(self.application)

def parse_public_numbers(self, parser):
super(OpensshSKED25519CertificateInfo, self).parse_public_numbers(parser)

self.application = parser.string()


# See https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD
class OpensshCertificate(object):
"""Encapsulates a formatted OpenSSH certificate including signature and signing key"""

def __init__(self, cert_info, signature):

self._cert_info = cert_info
Expand Down Expand Up @@ -635,10 +694,14 @@ def get_cert_info_object(key_type):
cert_info = OpensshRSACertificateInfo()
elif key_type == 'dsa':
cert_info = OpensshDSACertificateInfo()
elif key_type in ('ecdsa-nistp256', 'ecdsa-nistp384', 'ecdsa-nistp521'):
elif key_type in _ECDSA_CURVE_IDENTIFIERS:
cert_info = OpensshECDSACertificateInfo()
elif key_type == 'ed25519':
cert_info = OpensshED25519CertificateInfo()
elif key_type == 'ecdsa-sk':
cert_info = OpensshSKECDSACertificateInfo()
elif key_type == 'ed25519-sk':
cert_info = OpensshSKED25519CertificateInfo()
else:
raise ValueError("%s is not a valid key type" % key_type)

Expand Down
18 changes: 15 additions & 3 deletions plugins/module_utils/openssh/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def secure_write(path, mode, content):
class OpensshParser(object):
"""Parser for OpenSSH encoded objects"""
BOOLEAN_OFFSET = 1
UINT8_OFFSET = 1
UINT32_OFFSET = 4
UINT64_OFFSET = 8

Expand All @@ -104,6 +105,13 @@ def boolean(self):
self._pos = next_pos
return value

def uint8(self):
next_pos = self._check_position(self.UINT8_OFFSET)

value = _UBYTE.unpack(self._data[self._pos:next_pos])[0]
self._pos = next_pos
return value

def uint32(self):
next_pos = self._check_position(self.UINT32_OFFSET)

Expand Down Expand Up @@ -189,7 +197,6 @@ def signature_data(cls, signature_string):
signature_type = parser.string()
signature_blob = parser.string()

blob_parser = cls(signature_blob)
if signature_type in (b'ssh-rsa', b'rsa-sha2-256', b'rsa-sha2-512'):
# https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
# https://datatracker.ietf.org/doc/html/rfc8332#section-3
Expand All @@ -198,17 +205,22 @@ def signature_data(cls, signature_string):
# https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
signature_data['r'] = cls._big_int(signature_blob[:20], "big")
signature_data['s'] = cls._big_int(signature_blob[20:], "big")
elif signature_type in (b'ecdsa-sha2-nistp256', b'ecdsa-sha2-nistp384', b'ecdsa-sha2-nistp521'):
elif signature_type in (b'ecdsa-sha2-nistp256', b'ecdsa-sha2-nistp384', b'ecdsa-sha2-nistp521', b'[email protected]'):
# https://datatracker.ietf.org/doc/html/rfc5656#section-3.1.2
blob_parser = cls(signature_blob)
signature_data['r'] = blob_parser.mpint()
signature_data['s'] = blob_parser.mpint()
elif signature_type == b'ssh-ed25519':
elif signature_type in (b'ssh-ed25519', b'[email protected]'):
# https://datatracker.ietf.org/doc/html/rfc8032#section-5.1.2
signature_data['R'] = cls._big_int(signature_blob[:32], "little")
signature_data['S'] = cls._big_int(signature_blob[32:], "little")
else:
raise ValueError("%s is not a valid signature type" % signature_type)

if signature_type.startswith(b'sk-'):
signature_data['signature_flags'] = parser.uint8()
signature_data['signature_counter'] = parser.uint32()

signature_data['signature_type'] = signature_type

return signature_data
Expand Down
Loading