From 411606799037b47008ec736f068bc8aaf02758a4 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 14 Sep 2022 07:33:35 -0700 Subject: [PATCH 01/58] Add support for updating an authenticator. With this, you can update the user component just by calling add_authenticator with the same credential_id but a different user. This will be a little more valuable in a later commit. --- gimme_aws_creds/registered_authenticators.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/gimme_aws_creds/registered_authenticators.py b/gimme_aws_creds/registered_authenticators.py index 84f2ea12..3805aea4 100644 --- a/gimme_aws_creds/registered_authenticators.py +++ b/gimme_aws_creds/registered_authenticators.py @@ -39,11 +39,21 @@ def add_authenticator(self, credential_id, user): :param user: a user identifier (email, name, uid, ...) :type user: str """ + found = False + new_authenticators = [] authenticators = self._get_authenticators() - authenticators.append(RegisteredAuthenticator(credential_id=credential_id, user=user)) + for authenticator in authenticators: + if authenticator.matches(credential_id): + found = True + new_authenticators.append(RegisteredAuthenticator(credential_id=credential_id, user=user)) + else: + new_authenticators.append(authenticator) + + if found is False: + new_authenticators.append(RegisteredAuthenticator(credential_id=credential_id, user=user)) with open(self._json_path, 'w') as f: - json.dump(authenticators, f) + json.dump(new_authenticators, f) def get_authenticator_user(self, credential_id): """ From f91c866bbc0e02fd8f35e6e25c213ab27555c280 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 14 Sep 2022 07:50:12 -0700 Subject: [PATCH 02/58] Allow registering an alias for authenticators. Because sometimes, you have multiple Webauthn tokens and the user wants to give each a name of some sort. --- gimme_aws_creds/registered_authenticators.py | 24 ++++++++++++-------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/gimme_aws_creds/registered_authenticators.py b/gimme_aws_creds/registered_authenticators.py index 3805aea4..7e27e6b1 100644 --- a/gimme_aws_creds/registered_authenticators.py +++ b/gimme_aws_creds/registered_authenticators.py @@ -32,12 +32,14 @@ def _create_file_if_necessary(path): with open(path, 'w') as f: json.dump([], f) - def add_authenticator(self, credential_id, user): + def add_authenticator(self, credential_id, user, alias): """ :param credential_id: the id of added authenticator credential :type credential_id: bytes :param user: a user identifier (email, name, uid, ...) + :param alias: a user provided alias for the authenticator :type user: str + :type alias: str """ found = False new_authenticators = [] @@ -45,12 +47,12 @@ def add_authenticator(self, credential_id, user): for authenticator in authenticators: if authenticator.matches(credential_id): found = True - new_authenticators.append(RegisteredAuthenticator(credential_id=credential_id, user=user)) + new_authenticators.append(RegisteredAuthenticator(credential_id=credential_id, user=user, alias=alias)) else: new_authenticators.append(authenticator) if found is False: - new_authenticators.append(RegisteredAuthenticator(credential_id=credential_id, user=user)) + new_authenticators.append(RegisteredAuthenticator(credential_id=credential_id, user=user, alias=alias)) with open(self._json_path, 'w') as f: json.dump(new_authenticators, f) @@ -59,15 +61,17 @@ def get_authenticator_user(self, credential_id): """ :param credential_id: the id of the authenticator's credential :type credential_id: bytes - :return: user identifier, if credential id was registered by gimme-aws-creds, or None - :rtype: str + :returns: + -user (:py:class:`str`) - user identifier, if credential id was registered by gimme-aws-creds, or None + -alias (:py:class:`str`) - user provided alias for authenticator, or None + :rtype: (str, str) """ authenticators = self._get_authenticators() for authenticator in authenticators: if authenticator.matches(credential_id): - return authenticator.user + return authenticator.user, authenticator.alias - return None + return None, None def _get_authenticators(self): with open(self._json_path) as f: @@ -80,16 +84,18 @@ class RegisteredAuthenticator(dict): An entry in the registered authenticators json file, which holds a hashed credential id, and its user id. """ - def __init__(self, credential_id=None, credential_id_hash=None, user=None): + def __init__(self, credential_id=None, credential_id_hash=None, user=None, alias=None): """ :type credential_id: bytes :type user: str + :type alias: str """ credential_id_hash = credential_id_hash or self._hash_credential_id(credential_id) - super().__init__(credential_id_hash=credential_id_hash, user=user) + super().__init__(credential_id_hash=credential_id_hash, user=user, alias=alias) self.credential_id_hash = credential_id_hash self.user = user + self.alias = alias def matches(self, credential_id): return self.credential_id_hash == self._hash_credential_id(credential_id) From 6cad7eb78e66c5604be53fd039b5e3b545a3ca8c Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 14 Sep 2022 07:51:19 -0700 Subject: [PATCH 03/58] Aliases for webauthn authenticators. Building on the addition to registered authenticators, this adds support for adding, and using, aliases on authenticators. There are two points where we register an alias, the first is on device registration, and the second is on successful use. On the successful use case, we check to see if we already have an alias on the authenticator in question, and if we do not, we prompt for one. This makes it possible to track webauthn authenticators which were not registered via gimme_aws_creds. --- gimme_aws_creds/main.py | 5 ++++- gimme_aws_creds/okta.py | 19 +++++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/gimme_aws_creds/main.py b/gimme_aws_creds/main.py index 600e783d..fffabd08 100644 --- a/gimme_aws_creds/main.py +++ b/gimme_aws_creds/main.py @@ -895,7 +895,10 @@ def handle_setup_fido_authenticator(self): self.okta.set_preferred_mfa_type(None) credential_id, user = self.okta.setup_fido_authenticator() + alias = self.ui.input('Alias for webauthn token: ') + if alias == "": + alias = None registered_authenticators = RegisteredAuthenticators(self.ui) - registered_authenticators.add_authenticator(credential_id, user) + registered_authenticators.add_authenticator(credential_id, user, alias) raise errors.GimmeAWSCredsExitSuccess() diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 83875d72..c2926a67 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -681,6 +681,14 @@ def _check_webauthn_result(self, state_token, login_data): response_data = response.json() if 'status' in response_data and response_data['status'] == 'SUCCESS': + decoded = websafe_decode(credential_id) + registered_authenticators = RegisteredAuthenticators(self.ui) + user_name, alias = registered_authenticators.get_authenticator_user(decoded) + if alias is None: + alias = self.ui.input('Alias for webauthn token: ') + if alias is not None and alias != "": + registered_authenticators.add_authenticator(decoded, user_name, alias) + if 'stateToken' in response_data: return {'stateToken': response_data['stateToken'], 'apiResponse': response_data} if 'sessionToken' in response_data: @@ -843,8 +851,15 @@ def _build_factor_name(self, factor): try: registered_authenticators = RegisteredAuthenticators(self.ui) credential_id = websafe_decode(factor['profile']['credentialId']) - factor_name = registered_authenticators.get_authenticator_user(credential_id) - except Exception: + user_name, alias = registered_authenticators.get_authenticator_user(credential_id) + if user_name is not None and alias is not None: + factor_name = "{}: {}".format(user_name, alias) + elif user_name is not None: + factor_name = user_name + elif alias is not None: + factor_name = alias + except Exception as e: + self.ui.info("Error getting authenticator name: {}".format(e)) pass default_factor_name = factor['profile'].get('authenticatorName') or factor['factorType'] From 0de7e23472d91db6fae5802a8631aaceff88fcad Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 14 Sep 2022 07:53:10 -0700 Subject: [PATCH 04/58] Print a message after registering webauthn touch. Because there can be a noticible delay between the touch, and our next visible step, print a message to confirm that the touch was regisetred. This should help prevent things like accidentally triggering the Yubikey OTP output string, for users who don't notice that their device stopped blinking. (Like me, I definitely did that.) --- gimme_aws_creds/webauthn.py | 1 + 1 file changed, 1 insertion(+) diff --git a/gimme_aws_creds/webauthn.py b/gimme_aws_creds/webauthn.py index b240186b..4c57bbf8 100644 --- a/gimme_aws_creds/webauthn.py +++ b/gimme_aws_creds/webauthn.py @@ -85,6 +85,7 @@ def _verify(self, client): assertion_selection = client.get_assertion(options, event=self._event, on_keepalive=self.on_keepalive, pin=pin) + self.ui.info('Processing...\n') self._assertions = assertion_selection.get_assertions() assert len(self._assertions) >= 0 From e8a2aa637feb4211a7eb2e978f03e4e003a0b22f Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 14 Sep 2022 08:18:43 -0700 Subject: [PATCH 05/58] Add an optional use_keyring config option. If absent, or y, then we will use a system keyring if available. If it is n, then we will not use a system keyring even if it is available. This should prevent continual prompts for storing it in the system keyring for people who either do not wish to store it there, or for people whose corporate policies do not allow them to store it there. --- gimme_aws_creds/config.py | 16 ++++++++++++++++ gimme_aws_creds/main.py | 6 ++++++ gimme_aws_creds/okta.py | 13 ++++++++++--- 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/gimme_aws_creds/config.py b/gimme_aws_creds/config.py index cbd99d53..0a160633 100644 --- a/gimme_aws_creds/config.py +++ b/gimme_aws_creds/config.py @@ -225,6 +225,7 @@ def update_config_file(self): aws_default_duration = Default AWS session duration (3600) preferred_mfa_type = Select this MFA device type automatically include_path - (optional) includes that full role path to the role name for profile + use_keyring - Enables or disables use of the system keyring """ config = configparser.ConfigParser() @@ -249,6 +250,7 @@ def update_config_file(self): 'aws_default_duration': '3600', 'device_token': '', 'output_format': 'export', + 'use_keyring': 'y', } # See if a config file already exists. @@ -293,6 +295,8 @@ def update_config_file(self): else: config_dict['cred_profile'] = defaults['cred_profile'] + config_dict['use_keyring'] = self.get_use_keyring(defaults['use_keyring']) + self.write_config_file(config_dict) def write_config_file(self, config_dict): @@ -528,6 +532,18 @@ def _get_remember_device(self, default_entry): except ValueError: ui.default.warning("Remember the MFA device must be either y or n.") + def _get_use_keyring(self, default_entry): + """Option to use the system keyring""" + ui.default.message( + "Do you want to use the system keyring?\n" + "Please answer y or n.") + while True: + try: + return self._get_user_input_yes_no( + "Use system keyring", default_entry) + except ValueError: + ui.default.warning("Remember the value must be either y or n.") + def _get_user_input(self, message, default=None): """formats message to include default and then prompts user for input via keyboard with message. Returns user's input or if user doesn't diff --git a/gimme_aws_creds/main.py b/gimme_aws_creds/main.py index fffabd08..8015f7ad 100644 --- a/gimme_aws_creds/main.py +++ b/gimme_aws_creds/main.py @@ -556,6 +556,12 @@ def okta(self): okta.set_remember_device(self.config.remember_device or self.conf_dict.get('remember_device', False)) + + if self.conf_dict.get('use_keyring') in ('n', 'false', 'False'): + okta.set_use_keyring(False) + else: + okta.set_use_keyring(True) + return okta def get_resolver(self): diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index c2926a67..90bf45c1 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -65,6 +65,7 @@ def __init__(self, gac_ui, okta_org_url, verify_ssl_certs=True, device_token=Non self._preferred_mfa_type = None self._mfa_code = None self._remember_device = None + self._use_keyring = self.KEYRING_ENABLED self._use_oauth_access_token = False self._use_oauth_id_token = False @@ -108,6 +109,12 @@ def set_mfa_code(self, mfa_code): def set_remember_device(self, remember_device): self._remember_device = bool(remember_device) + def set_use_keyring(self, use_keyring): + if self.KEYRING_ENABLED: + self._use_keyring = bool(use_keyring) + else: + self._use_keyring = False + def use_oauth_access_token(self, val=True): self._use_oauth_access_token = val @@ -351,7 +358,7 @@ def _login_username_password(self, state_token, url): # ref: https://developer.okta.com/docs/reference/error-codes/#example-errors-listed-by-http-return-code elif response.status_code in [400, 401, 403, 404, 409, 429, 500, 501, 503]: if response_data['errorCode'] == "E0000004": - if self.KEYRING_ENABLED: + if self._use_keyring: try: self.ui.info("Stored password is invalid, clearing. Please try again") keyring.delete_password(self.KEYRING_SERVICE, creds['username']) @@ -881,7 +888,7 @@ def _get_username_password_creds(self): username = self._username password = self._password - if not password and self.KEYRING_ENABLED: + if not password and self._use_keyring: try: # If the OS supports a keyring, offer to save the password password = keyring.get_password(self.KEYRING_SERVICE, username) @@ -897,7 +904,7 @@ def _get_username_password_creds(self): if len(password) > 0: break - if self.KEYRING_ENABLED: + if self._use_keyring: # If the OS supports a keyring, offer to save the password if self.ui.input("Do you want to save this password in the keyring? (y/N) ") == 'y': try: From e6fae51197aa6f88ba4d840a4cde5cd672b94ed9 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 14 Sep 2022 08:20:05 -0700 Subject: [PATCH 06/58] Add newline at end of file. --- gimme_aws_creds/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gimme_aws_creds/config.py b/gimme_aws_creds/config.py index 0a160633..7c7d28c1 100644 --- a/gimme_aws_creds/config.py +++ b/gimme_aws_creds/config.py @@ -594,4 +594,4 @@ def fail_if_profile_not_found(self, profile_config, conf_profile, default_sectio """ if not profile_config and conf_profile == default_section: raise errors.GimmeAWSCredsError( - 'DEFAULT profile is missing! This is profile is required when not using --profile') \ No newline at end of file + 'DEFAULT profile is missing! This is profile is required when not using --profile') From e77ad2751ef55c0ed31f382515c240a93d476d7c Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 14 Sep 2022 08:36:13 -0700 Subject: [PATCH 07/58] Add use_keyring to README.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 75b3442d..255d68e5 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,7 @@ A configuration wizard will prompt you to enter the necessary configuration para - include_path - (optional) Includes full role path to the role name in AWS credential profile name. (default: n). If `y`: `-/some/path/administrator`. If `n`: `-administrator` - remember_device - y or n. If yes, the MFA device will be remembered by Okta service for a limited time. This option can also be set interactively in the command line using `-m` or `--remember-device` - output_format - `json` or `export`, determines default credential output format, can be also specified by `--output-format FORMAT` and `-o FORMAT`. +- use_keyring - y or n. Defaults to y. If n, use of the system keyring for password storage is disabled. ## Configuration File From da12a0fe157293873866d703dea00c49ac18a47b Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 14 Sep 2022 10:30:50 -0700 Subject: [PATCH 08/58] Replace nosetests with pytest. A lot of search/replace made this go so much easier. --- Makefile | 2 +- requirements_dev.txt | 2 +- tests/test_aws_resolver.py | 25 ++++++++++---------- tests/test_okta_client.py | 47 +++++++++++++++++++------------------- 4 files changed, 37 insertions(+), 39 deletions(-) diff --git a/Makefile b/Makefile index 0e4f78bb..64c6b7c1 100644 --- a/Makefile +++ b/Makefile @@ -5,4 +5,4 @@ docker-build: docker build -t gimme-aws-creds . test: docker-build - nosetests -vv tests + pytest -v tests diff --git a/requirements_dev.txt b/requirements_dev.txt index 1b87f02f..1fbe1748 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,3 +1,3 @@ -r requirements.txt responses>=0.5.1,<1.0.0 -nose>=1.3.7 \ No newline at end of file +pytest diff --git a/tests/test_aws_resolver.py b/tests/test_aws_resolver.py index 59b528c2..5fc70be7 100644 --- a/tests/test_aws_resolver.py +++ b/tests/test_aws_resolver.py @@ -6,7 +6,6 @@ import requests import responses -from nose.tools import assert_equals import gimme_aws_creds.common as common_def from gimme_aws_creds.aws import AwsResolver @@ -2087,11 +2086,11 @@ def test_enumerate_saml_roles(self): """Test parsing the roles from SAML assrtion & AwsSigninPage""" responses.add(responses.POST, 'https://signin.aws.amazon.com/saml', status=200, body=self.aws_signinpage) result = self.resolver._enumerate_saml_roles(self.saml, 'https://signin.aws.amazon.com/saml') - assert_equals(result[0], self.roles[0]) - assert_equals(result[1], self.roles[1]) - assert_equals(result[2], self.roles[2]) - assert_equals(result[3], self.roles[3]) - assert_equals(result[4], self.roles[4]) + assert result[0] == self.roles[0] + assert result[1] == self.roles[1] + assert result[2] == self.roles[2] + assert result[3] == self.roles[3] + assert result[4] == self.roles[4] def test_display_role(self): """Test the roles are well displayed (grouped/indented by account)""" @@ -2105,11 +2104,11 @@ def test_display_role(self): self.display_role.append(' [ 4 ]: testrole5') result = self.resolver._display_role(self.roles) - assert_equals(result[0], self.display_role[0]) - assert_equals(result[1], self.display_role[1]) - assert_equals(result[2], self.display_role[2]) - assert_equals(result[3], self.display_role[3]) - assert_equals(result[4], self.display_role[4]) - assert_equals(result[5], self.display_role[5]) - assert_equals(result[6], self.display_role[6]) + assert result[0] == self.display_role[0] + assert result[1] == self.display_role[1] + assert result[2] == self.display_role[2] + assert result[3] == self.display_role[3] + assert result[4] == self.display_role[4] + assert result[5] == self.display_role[5] + assert result[6] == self.display_role[6] diff --git a/tests/test_okta_client.py b/tests/test_okta_client.py index ac18156b..3646ed80 100644 --- a/tests/test_okta_client.py +++ b/tests/test_okta_client.py @@ -12,7 +12,6 @@ import responses from fido2.attestation import PackedAttestation from fido2.ctap2 import AttestationObject, AuthenticatorData, AttestedCredentialData -from nose.tools import assert_equals from gimme_aws_creds import errors, ui from gimme_aws_creds.okta import OktaClient @@ -338,7 +337,7 @@ def test_login_username_password(self, mock_pass, mock_input): responses.add(responses.POST, self.okta_org_url + '/api/v1/authn', status=200, body=json.dumps(auth_response)) result = self.client._login_username_password(self.state_token, self.okta_org_url + '/api/v1/authn') - assert_equals(result, {'stateToken': self.state_token, 'apiResponse': auth_response}) + assert result == {'stateToken': self.state_token, 'apiResponse': auth_response} @responses.activate def test_login_send_sms(self): @@ -429,7 +428,7 @@ def test_login_send_sms(self): responses.add(responses.POST, 'https://example.okta.com/api/v1/authn/factors/sms9hmdk2qvhjOQQ30h7/verify', status=200, body=json.dumps(verify_response)) result = self.client._login_send_sms(self.state_token, self.sms_factor) - assert_equals(result, {'stateToken': self.state_token, 'apiResponse': verify_response}) + assert result == {'stateToken': self.state_token, 'apiResponse': verify_response} @responses.activate def test_login_send_push(self): @@ -535,7 +534,7 @@ def test_login_send_push(self): responses.add(responses.POST, 'https://example.okta.com/api/v1/authn/factors/opf9ei43pbAgb2qgc0h7/verify', status=200, body=json.dumps(verify_response)) result = self.client._login_send_push(self.state_token, self.push_factor) - assert_equals(result, {'stateToken': self.state_token, 'apiResponse': verify_response}) + assert result == {'stateToken': self.state_token, 'apiResponse': verify_response} @responses.activate @patch('getpass.getpass', return_value='1234qwert') @@ -585,7 +584,7 @@ def test_login_input_mfa_challenge(self, mock_pass): } responses.add(responses.POST, 'https://example.okta.com/api/v1/authn/factors/sms9hmdk2qvhjOQQ30h7/verify', status=200, body=json.dumps(verify_response)) result = self.client._login_input_mfa_challenge(self.state_token, 'https://example.okta.com/api/v1/authn/factors/sms9hmdk2qvhjOQQ30h7/verify') - assert_equals(result, {'stateToken': self.state_token, 'apiResponse': verify_response}) + assert result == {'stateToken': self.state_token, 'apiResponse': verify_response} @responses.activate @@ -692,7 +691,7 @@ def test_check_push_result(self): responses.add(responses.POST, 'https://example.okta.com/api/v1/authn/factors/opf9ei43pbAgb2qgc0h7/verify', status=200, body=json.dumps(verify_response)) result = self.client._login_send_push(self.state_token, self.push_factor) - assert_equals(result, {'stateToken': self.state_token, 'apiResponse': verify_response}) + assert result == {'stateToken': self.state_token, 'apiResponse': verify_response} @responses.activate @patch('builtins.input', return_value='ann@example.com') @@ -967,7 +966,7 @@ def test_get_saml_response(self): """Test that the SAML reponse was successful""" responses.add(responses.GET, 'https://example.okta.com/app/gimmecreds/exkatg7u9g6LJfFrZ0h7/sso/saml', status=200, body=self.login_saml) result = self.client.get_saml_response('https://example.okta.com/app/gimmecreds/exkatg7u9g6LJfFrZ0h7/sso/saml') - assert_equals(result['TargetUrl'], 'https://localhost:8443/saml/SSO') + assert result['TargetUrl'] == 'https://localhost:8443/saml/SSO' @responses.activate def test_missing_saml_response(self): @@ -984,25 +983,25 @@ def test_missing_saml_response(self): # # The SAMLResponse value doesn't matter because the API response is mocked # saml_data = {'SAMLResponse': 'BASE64_String', 'RelayState': '', 'TargetUrl': 'https://localhost:8443/saml/SSO'} # result = self.client._get_aws_account_info(self.gimme_creds_server, saml_data) - # assert_equals(self.client.aws_access, self.api_results) + # assert self.client.aws_access == self.api_results @patch('builtins.input', return_value='0') def test_choose_factor_sms(self, mock_input): """ Test selecting SMS as a MFA""" result = self.client._choose_factor(self.factor_list) - assert_equals(result, self.sms_factor) + assert result == self.sms_factor @patch('builtins.input', return_value='1') def test_choose_factor_push(self, mock_input): """ Test selecting Okta Verify as a MFA""" result = self.client._choose_factor(self.factor_list) - assert_equals(result, self.push_factor) + assert result == self.push_factor @patch('builtins.input', return_value='2') def test_choose_factor_totp(self, mock_input): """ Test selecting TOTP code as a MFA""" result = self.client._choose_factor(self.factor_list) - assert_equals(result, self.totp_factor) + assert result == self.totp_factor @patch('builtins.input', return_value='12') def test_choose_bad_factor_totp(self, mock_input): @@ -1014,7 +1013,7 @@ def test_choose_bad_factor_totp(self, mock_input): def test_choose_factor_webauthn(self, mock_input): """ Test selecting webauthn code as a MFA""" result = self.client._choose_factor(self.factor_list) - assert_equals(result, self.webauthn_factor) + assert result == self.webauthn_factor @patch('builtins.input', return_value='a') def test_choose_non_number_factor_totp(self, mock_input): @@ -1025,33 +1024,33 @@ def test_choose_non_number_factor_totp(self, mock_input): def test_build_factor_name_sms(self): """ Test building a display name for SMS""" result = self.client._build_factor_name(self.sms_factor) - assert_equals(result, "sms: +1 XXX-XXX-1234") + assert result == "sms: +1 XXX-XXX-1234" def test_build_factor_name_push(self): """ Test building a display name for push""" result = self.client._build_factor_name(self.push_factor) - assert_equals(result, "Okta Verify App: SmartPhone_IPhone: Jane.Doe iPhone") + assert result == "Okta Verify App: SmartPhone_IPhone: Jane.Doe iPhone" def test_build_factor_name_totp(self): """ Test building a display name for TOTP""" result = self.client._build_factor_name(self.totp_factor) - assert_equals(result, "token:software:totp( OKTA ) : jane.doe@example.com") + assert result == "token:software:totp( OKTA ) : jane.doe@example.com" def test_build_factor_name_hardware(self): """ Test building a display name for hardware""" result = self.client._build_factor_name(self.hardware_factor) - assert_equals(result, "token:hardware: YUBICO") + assert result == "token:hardware: YUBICO" def test_build_factor_name_unknown(self): """ Handle an unknown MFA factor""" with self.captured_output() as (out, err): result = self.client._build_factor_name(self.unknown_factor) - assert_equals(result, "Unknown MFA type: UNKNOWN_FACTOR") + assert result == "Unknown MFA type: UNKNOWN_FACTOR" def test_build_factor_name_webauthn_unregistered(self): """ Test building a display name for an unregistered webauthn factor """ result = self.client._build_factor_name(self.webauthn_factor) - assert_equals(result, "webauthn: webauthn") + assert result == "webauthn: webauthn" def test_build_factor_name_webauthn_unregistered_with_authenticator_name(self): """ Test building a display name for an unregistered webauthn factor with a specified authenticator name """ @@ -1061,36 +1060,36 @@ def test_build_factor_name_webauthn_unregistered_with_authenticator_name(self): webauthn_factor_with_authenticator_name['profile']['authenticatorName'] = authenticator_name result = self.client._build_factor_name(self.webauthn_factor) - assert_equals(result, "webauthn: " + authenticator_name) + assert result == "webauthn: " + authenticator_name @patch('gimme_aws_creds.registered_authenticators.RegisteredAuthenticators.get_authenticator_user', return_value='jane.doe@example.com') def test_build_factor_name_webauthn_registered(self, mock_input): """ Test building a display name for a registered webauthn factor """ result = self.client._build_factor_name(self.webauthn_factor) - assert_equals(result, "webauthn: jane.doe@example.com") + assert result == "webauthn: jane.doe@example.com" # def test_get_app_by_name(self): # """ Test selecting app by name""" # self.client.aws_access = self.api_results # result = self.client.get_app_by_name('Sample AWS Account') - # assert_equals(result['name'], 'Sample AWS Account') + # assert result['name'] == 'Sample AWS Account' # # def test_get_role_by_name(self): # """ Test selecting app by name""" # self.client.aws_access = self.api_results # result = self.client.get_role_by_name(self.api_results[0], 'ReadOnly') - # assert_equals(result['name'], 'ReadOnly') + # assert result['name'] == 'ReadOnly' # @patch('builtins.input', return_value='0') # def test_choose_role(self, mock_input): # """ Test selecting role with user input""" # result = self.client.choose_role(self.api_results[0]) - # assert_equals(result['name'], 'ReadOnly') + # assert result['name'] == 'ReadOnly' # # @patch('builtins.input', return_value='0') # def test_choose_app(self, mock_input): # """ Test selecting app with user input""" # self.client.aws_access = self.api_results # result = self.client.choose_app() - # assert_equals(result['name'], 'Sample AWS Account') + # assert result['name'] == 'Sample AWS Account' From 9b04e928e0d1d4fd8cc2f91ae2d99fb9ea3a52d0 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 14 Sep 2022 10:31:30 -0700 Subject: [PATCH 09/58] Fix tests to handle alias in registered auth. --- tests/test_okta_client.py | 2 +- tests/test_registered_authenticators.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_okta_client.py b/tests/test_okta_client.py index 3646ed80..c355c52e 100644 --- a/tests/test_okta_client.py +++ b/tests/test_okta_client.py @@ -1063,7 +1063,7 @@ def test_build_factor_name_webauthn_unregistered_with_authenticator_name(self): assert result == "webauthn: " + authenticator_name @patch('gimme_aws_creds.registered_authenticators.RegisteredAuthenticators.get_authenticator_user', - return_value='jane.doe@example.com') + return_value=['jane.doe@example.com', None]) def test_build_factor_name_webauthn_registered(self, mock_input): """ Test building a display name for a registered webauthn factor """ result = self.client._build_factor_name(self.webauthn_factor) diff --git a/tests/test_registered_authenticators.py b/tests/test_registered_authenticators.py index ce250335..ef9722d7 100644 --- a/tests/test_registered_authenticators.py +++ b/tests/test_registered_authenticators.py @@ -20,7 +20,7 @@ def test_file_creation_post_init(self): def test_add_authenticator_sanity(self): cred_id, user = b'my-credential-id', 'my-user' - self.registered_authenticators.add_authenticator(cred_id, user) + self.registered_authenticators.add_authenticator(cred_id, user, None) with open(self.file_path) as f: data = json.load(f) @@ -34,7 +34,7 @@ def test_add_authenticator_sanity(self): def test_get_authenticator_user_sanity(self): cred_id, user = b'my-credential-id', 'my-user' - self.registered_authenticators.add_authenticator(cred_id, user) + self.registered_authenticators.add_authenticator(cred_id, user, None) - authenticator_user = self.registered_authenticators.get_authenticator_user(cred_id) + authenticator_user, alias = self.registered_authenticators.get_authenticator_user(cred_id) assert authenticator_user == user From a72a3644ee92ab38cf53937c312b2f076a04de8c Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 14 Sep 2022 10:31:59 -0700 Subject: [PATCH 10/58] Add alias tests for registered authenticators --- tests/test_okta_client.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/test_okta_client.py b/tests/test_okta_client.py index c355c52e..e6a3c30b 100644 --- a/tests/test_okta_client.py +++ b/tests/test_okta_client.py @@ -1069,6 +1069,20 @@ def test_build_factor_name_webauthn_registered(self, mock_input): result = self.client._build_factor_name(self.webauthn_factor) assert result == "webauthn: jane.doe@example.com" + @patch('gimme_aws_creds.registered_authenticators.RegisteredAuthenticators.get_authenticator_user', + return_value=[None, "desktop"]) + def test_build_factor_name_webauthn_registered_alias(self, mock_input): + """ Test building a display name for a registered webauthn factor with just an alias """ + result = self.client._build_factor_name(self.webauthn_factor) + assert result == "webauthn: desktop" + + @patch('gimme_aws_creds.registered_authenticators.RegisteredAuthenticators.get_authenticator_user', + return_value=['jane.doe@example.com', "desktop"]) + def test_build_factor_name_webauthn_registered_name_alias(self, mock_input): + """ Test building a display name for a registered webauthn factor with user and alias """ + result = self.client._build_factor_name(self.webauthn_factor) + assert result == "webauthn: jane.doe@example.com: desktop" + # def test_get_app_by_name(self): # """ Test selecting app by name""" # self.client.aws_access = self.api_results From 5d152a3446611ad381631644fcfd88cdaf70ee75 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Tue, 20 Sep 2022 20:01:08 -0700 Subject: [PATCH 11/58] Upgrade to okta 2.0.x. This gets us off the old, no longer supported, beta release. --- gimme_aws_creds/main.py | 8 ++++---- requirements.txt | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/gimme_aws_creds/main.py b/gimme_aws_creds/main.py index 8015f7ad..d55ac624 100644 --- a/gimme_aws_creds/main.py +++ b/gimme_aws_creds/main.py @@ -21,8 +21,8 @@ # extras import boto3 from botocore.exceptions import ClientError -from okta.framework.ApiClient import ApiClient -from okta.framework.OktaError import OktaError +from okta.api_client import APIClient +from okta.errors.error import Error as OktaError # local imports from . import errors, ui @@ -239,8 +239,8 @@ def _get_aws_account_info(okta_org_url, okta_api_key, username): """ Call the Okta User API and process the results to return just the information we need for gimme_aws_creds""" # We need access to the entire JSON response from the Okta APIs, so we need to - # use the low-level ApiClient instead of UsersClient and AppInstanceClient - users_client = ApiClient(okta_org_url, okta_api_key, pathname='/api/v1/users') + # use the low-level APIClient instead of UsersClient and AppInstanceClient + users_client = APIClient(okta_org_url, okta_api_key, pathname='/api/v1/users') # Get User information try: diff --git a/requirements.txt b/requirements.txt index ed5b073d..331b07e0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,5 +4,5 @@ configparser>=3.5.0,<4.0.0 keyring>=21.4.0 requests>=2.13.0,<3.0.0 fido2>=0.9.1,<0.10.0 -okta>=0.0.4,<1.0.0 +okta>=2.0.0 ctap-keyring-device>=1.0.6 From d6a40c0a53339854afd17fb9ae8f5829c56e1234 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Tue, 20 Sep 2022 20:27:37 -0700 Subject: [PATCH 12/58] Upgrade to python 3.10. --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 493882e7..1f990aa9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.8-alpine +FROM python:3.10-alpine WORKDIR /opt/gimme-aws-creds From dafd794a8e81d701c59b10ebc16b055d0145abae Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Tue, 20 Sep 2022 20:28:11 -0700 Subject: [PATCH 13/58] Use multiple RUN commands. This is far easier to read, and even gives better status updates. --- Dockerfile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1f990aa9..a7bba090 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,9 +8,9 @@ RUN apk --update add libgcc ENV PACKAGES="gcc musl-dev python3-dev libffi-dev openssl-dev cargo" -RUN apk --update add $PACKAGES \ - && pip install --upgrade pip setuptools-rust \ - && python setup.py install \ - && apk del --purge $PACKAGES +RUN apk --update add $PACKAGES +RUN pip install --upgrade pip setuptools-rust +RUN python setup.py install +RUN apk del --purge $PACKAGES ENTRYPOINT ["/usr/local/bin/gimme-aws-creds"] From e9ba4e8167973d52a65f738a5cf253ed4055d712 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Tue, 20 Sep 2022 20:28:35 -0700 Subject: [PATCH 14/58] Use pip install instead of setup.py. setup.py is depreciated, and recently broke, so let's just use pip install instead. --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index a7bba090..8ee02f2a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ ENV PACKAGES="gcc musl-dev python3-dev libffi-dev openssl-dev cargo" RUN apk --update add $PACKAGES RUN pip install --upgrade pip setuptools-rust -RUN python setup.py install +RUN pip install . RUN apk del --purge $PACKAGES ENTRYPOINT ["/usr/local/bin/gimme-aws-creds"] From fc6d081100afc3c26412e95706007b5173674570 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Tue, 20 Sep 2022 20:29:16 -0700 Subject: [PATCH 15/58] Run tests in docker image. Among other things, this ensures that the tests all pass in a standardized enviornment, lacking anything specific to the machine the developer is running on. --- Dockerfile | 2 ++ Makefile | 2 ++ 2 files changed, 4 insertions(+) diff --git a/Dockerfile b/Dockerfile index 8ee02f2a..2ac3c6c8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,6 +11,8 @@ ENV PACKAGES="gcc musl-dev python3-dev libffi-dev openssl-dev cargo" RUN apk --update add $PACKAGES RUN pip install --upgrade pip setuptools-rust RUN pip install . +RUN pip install -r requirements_dev.txt +RUN pytest tests RUN apk del --purge $PACKAGES ENTRYPOINT ["/usr/local/bin/gimme-aws-creds"] diff --git a/Makefile b/Makefile index 64c6b7c1..ea4cddfb 100644 --- a/Makefile +++ b/Makefile @@ -5,4 +5,6 @@ docker-build: docker build -t gimme-aws-creds . test: docker-build + +local_test: pytest -v tests From 3975410fdff05d55792c3ed8779a6e4a12a5461d Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 05:01:25 -0700 Subject: [PATCH 16/58] Switch from getpass to pwinput. This is entirely for a better user interface experience, getting asterisks back instead of nothing while typing a password may leak the length, but almost every single form that takes user input does this so a user gets feedback while they type. Since this includes the Okta web forms, this seems like a straight win. --- gimme_aws_creds/okta.py | 4 ++-- gimme_aws_creds/ui.py | 4 ++-- gimme_aws_creds/webauthn.py | 4 ++-- requirements.txt | 1 + tests/test_okta_client.py | 14 +++++++------- 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 90bf45c1..3215fe8b 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -11,7 +11,7 @@ """ import base64 import copy -import getpass +import pwinput import re import socket import time @@ -900,7 +900,7 @@ def _get_username_password_creds(self): # via OKTA_USERNAME env and user might not remember. for x in range(0, 5): passwd_prompt = "Okta Password for {}: ".format(username) - password = getpass.getpass(prompt=passwd_prompt) + password = pwinput.pwinput(prompt=passwd_prompt) if len(password) > 0: break diff --git a/gimme_aws_creds/ui.py b/gimme_aws_creds/ui.py index e3a8057c..1e83a110 100644 --- a/gimme_aws_creds/ui.py +++ b/gimme_aws_creds/ui.py @@ -10,7 +10,7 @@ See the License for the specific language governing permissions and* limitations under the License.* """ import builtins -import getpass +import pwinput import os import sys @@ -112,7 +112,7 @@ def message(self, message): builtins.print(message, file=sys.stderr) def read_input(self, hidden=False): - return getpass.getpass('') if hidden else builtins.input() + return pwinput.pwinput('') if hidden else builtins.input() def notify(self, message): builtins.print(message, file=sys.stderr) diff --git a/gimme_aws_creds/webauthn.py b/gimme_aws_creds/webauthn.py index 4c57bbf8..a8854ccb 100644 --- a/gimme_aws_creds/webauthn.py +++ b/gimme_aws_creds/webauthn.py @@ -12,7 +12,7 @@ from __future__ import print_function, absolute_import, unicode_literals -from getpass import getpass +import pwinput from threading import Event, Thread from ctap_keyring_device.ctap_keyring_device import CtapKeyringDevice @@ -147,7 +147,7 @@ def _get_pin_from_client(client): return None # Prompt for PIN if needed - pin = getpass("Please enter PIN: ") + pin = pwinput.pwinput("Please enter PIN: ") return pin @staticmethod diff --git a/requirements.txt b/requirements.txt index 331b07e0..2c13b19c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ requests>=2.13.0,<3.0.0 fido2>=0.9.1,<0.10.0 okta>=2.0.0 ctap-keyring-device>=1.0.6 +pwinput>=1.0.2 diff --git a/tests/test_okta_client.py b/tests/test_okta_client.py index e6a3c30b..a9f8f1fe 100644 --- a/tests/test_okta_client.py +++ b/tests/test_okta_client.py @@ -259,14 +259,14 @@ def test_get_state_token(self): result = self.client._get_initial_flow_state(self.server_embed_link) self.assertEqual(result, {'stateToken': self.state_token, 'apiResponse': auth_response}) - @patch('getpass.getpass', return_value='1234qwert') + @patch('pwinput.pwinput', return_value='1234qwert') @patch('builtins.input', return_value='ann@example.com') def test_get_username_password_creds(self, mock_pass, mock_input): """Test that initial authentication works with Okta""" result = self.client._get_username_password_creds() self.assertDictEqual(result, {'username': 'ann@example.com', 'password': '1234qwert' }) - @patch('getpass.getpass', return_value='1234qwert') + @patch('pwinput.pwinput', return_value='1234qwert') @patch('builtins.input', return_value='') def test_passed_username(self, mock_pass, mock_input): """Test that initial authentication works with Okta""" @@ -274,14 +274,14 @@ def test_passed_username(self, mock_pass, mock_input): result = self.client._get_username_password_creds() self.assertDictEqual(result, {'username': 'ann@example.com', 'password': '1234qwert' }) -# @patch('getpass.getpass', return_value='1234qwert') +# @patch('pwinput.pwinput', return_value='1234qwert') # @patch('builtins.input', return_value='ann') # def test_bad_username(self, mock_pass, mock_input): # """Test that initial authentication works with Okta""" # with self.assertRaises(errors.GimmeAWSCredsExitBase): # self.client._get_username_password_creds() - @patch('getpass.getpass', return_value='') + @patch('pwinput.pwinput', return_value='') @patch('builtins.input', return_value='ann@example.com') def test_missing_password(self, mock_pass, mock_input): """Test that initial authentication works with Okta""" @@ -289,7 +289,7 @@ def test_missing_password(self, mock_pass, mock_input): self.client._get_username_password_creds() @responses.activate - @patch('getpass.getpass', return_value='1234qwert') + @patch('pwinput.pwinput', return_value='1234qwert') @patch('builtins.input', return_value='ann@example.com') def test_login_username_password(self, mock_pass, mock_input): """Test that initial authentication works with Okta""" @@ -537,7 +537,7 @@ def test_login_send_push(self): assert result == {'stateToken': self.state_token, 'apiResponse': verify_response} @responses.activate - @patch('getpass.getpass', return_value='1234qwert') + @patch('pwinput.pwinput', return_value='1234qwert') def test_login_input_mfa_challenge(self, mock_pass): """Test that MFA works with Okta""" @@ -695,7 +695,7 @@ def test_check_push_result(self): @responses.activate @patch('builtins.input', return_value='ann@example.com') - @patch('getpass.getpass', return_value='1234qwert') + @patch('pwinput.pwinput', return_value='1234qwert') @patch('gimme_aws_creds.webauthn.WebAuthnClient.make_credential', return_value=(b'', AttestationObject.create( PackedAttestation.FORMAT, AuthenticatorData.create( hashlib.sha256(b'example.okta.com').digest(), From 1d7881383104d5db550c1837f50594d1019abaa3 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 05:03:00 -0700 Subject: [PATCH 17/58] Don't import the unused ui, linter fix. --- gimme_aws_creds/okta.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 3215fe8b..676020be 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -32,7 +32,7 @@ from gimme_aws_creds.u2f import FactorU2F from gimme_aws_creds.webauthn import WebAuthnClient, FakeAssertion -from . import errors, ui, version, duo +from . import errors, version, duo from .errors import GimmeAWSCredsMFAEnrollStatus from .registered_authenticators import RegisteredAuthenticators From 181ebeace98d799703c08cae8fdb39d757437274 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 05:03:53 -0700 Subject: [PATCH 18/58] Use assertion.credential['id'] for credentialID. Getting it this way has two important benefits: 1: It is already decoded. 2: When we support a single prompt for multiple webauthn keys, it will be the only possible way to know which key was used. 2 sounds like the more compelling reason, however the feature itself is of somewhat questionable value in that case. --- gimme_aws_creds/okta.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 676020be..8882161b 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -688,13 +688,12 @@ def _check_webauthn_result(self, state_token, login_data): response_data = response.json() if 'status' in response_data and response_data['status'] == 'SUCCESS': - decoded = websafe_decode(credential_id) registered_authenticators = RegisteredAuthenticators(self.ui) - user_name, alias = registered_authenticators.get_authenticator_user(decoded) + user_name, alias = registered_authenticators.get_authenticator_user(assertion.credential['id']) if alias is None: alias = self.ui.input('Alias for webauthn token: ') if alias is not None and alias != "": - registered_authenticators.add_authenticator(decoded, user_name, alias) + registered_authenticators.add_authenticator(assertion.credential['id'], user_name, alias) if 'stateToken' in response_data: return {'stateToken': response_data['stateToken'], 'apiResponse': response_data} From 77a423e2b038bfb50f8584eb45772f63c875a0a8 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 05:06:00 -0700 Subject: [PATCH 19/58] Replace assert with explicit check. This is because asserts get dropped if the code ever gets compiled to python bytecode. This is largely just a linter fix. --- gimme_aws_creds/webauthn.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gimme_aws_creds/webauthn.py b/gimme_aws_creds/webauthn.py index a8854ccb..1ef6fffa 100644 --- a/gimme_aws_creds/webauthn.py +++ b/gimme_aws_creds/webauthn.py @@ -87,7 +87,9 @@ def _verify(self, client): pin=pin) self.ui.info('Processing...\n') self._assertions = assertion_selection.get_assertions() - assert len(self._assertions) >= 0 + if len(self._assertions) < 0: + self.ui.info('No assertions from key.') + raise assertion_res = assertion_selection.get_response(0) self._client_data = assertion_res.client_data From 17171eb040902414dffbb2a8b0c9836a3aab4ab3 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 05:06:39 -0700 Subject: [PATCH 20/58] Support for passing a list of credential IDs. If we get a list of IDs, then we pass the whole list as the accept list, and if any of them are supported by a given device we can use it. This is required for being able to just ask the user if they want to use webauthn, instead of asking them to pick which webauthn device from a potentially unlabeled list that they want to use. --- gimme_aws_creds/webauthn.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/gimme_aws_creds/webauthn.py b/gimme_aws_creds/webauthn.py index 1ef6fffa..f6cdba5f 100644 --- a/gimme_aws_creds/webauthn.py +++ b/gimme_aws_creds/webauthn.py @@ -39,7 +39,7 @@ def __init__(self, ui, okta_org_url, challenge, credential_id=None, timeout_ms=3 """ :param okta_org_url: Base URL string for Okta IDP. :param challenge: Challenge - :param credential_id: FIDO credential ID + :param credential_id: FIDO credential ID, or list of FIDO credential IDs. """ self.ui = ui self._okta_org_url = okta_org_url @@ -51,11 +51,16 @@ def __init__(self, ui, okta_org_url, challenge, credential_id=None, timeout_ms=3 self._assertions = None self._client_data = None self._rp = {'id': okta_org_url[8:], 'name': okta_org_url[8:]} + self._allow_list = [] if credential_id: - self._allow_list = [ - PublicKeyCredentialDescriptor(PublicKeyCredentialType.PUBLIC_KEY, websafe_decode(credential_id)) - ] + if type(credential_id) is list: + for id in credential_id: + self._allow_list.append(PublicKeyCredentialDescriptor(PublicKeyCredentialType.PUBLIC_KEY, websafe_decode(id))) + else: + self._allow_list = [ + PublicKeyCredentialDescriptor(PublicKeyCredentialType.PUBLIC_KEY, websafe_decode(credential_id)) + ] def locate_device(self): # Locate a device From e98a740005e5cf5dab64cf385cc73f521194e02a Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 05:08:41 -0700 Subject: [PATCH 21/58] Rework except logic a little bit. This is almost entirely to quiet the linter, however it also simplifies the code a little bit, and adds a comment to explain the code. Specifically, we should either check each and every layer of the object tree to see if the components exist, or we should rely on the AttributeError exception, not a combination of both. And we should only catch AttributeError, so we don't mask code problems in self._print_correct_answer. --- gimme_aws_creds/okta.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 8882161b..2cc142dd 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -610,13 +610,14 @@ def _check_push_result(self, state_token, login_data): response_data = response.json() + # We could check for the existence of every layer... + # Or we can just catch AttributeError and pass, with a nosec comment. try: - if '_embedded' in response_data['_embedded']['factor']: - if response_data['_embedded']['factor']['_embedded']['challenge']['correctAnswer']: - if self._print_correct_answer: - self._print_correct_answer(response_data['_embedded']['factor']['_embedded']['challenge']['correctAnswer']) - self._print_correct_answer = None - except: + if response_data['_embedded']['factor']['_embedded']['challenge']['correctAnswer']: + if self._print_correct_answer: + self._print_correct_answer(response_data['_embedded']['factor']['_embedded']['challenge']['correctAnswer']) + self._print_correct_answer = None + except AttributeError: # nosec pass if 'stateToken' in response_data: From a9b63e9a5b4b2e14aa1e211e6e875906ea923bb7 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 05:11:11 -0700 Subject: [PATCH 22/58] Whitespace cleanup. Nothing to see here, just making the linter happier. --- gimme_aws_creds/okta.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 2cc142dd..241ae252 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -1050,7 +1050,7 @@ def _introspect_factors(self, state_token): @staticmethod def _extract_state_token_from_http_response(http_res): saml_soup = BeautifulSoup(http_res.text, "html.parser") - + mfa_string = ( 'Dodatečné ověření', 'Ekstra verificering', From 48780adb24bc4212cc286dd3f915f2db762f2424 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 05:13:17 -0700 Subject: [PATCH 23/58] Rework the DUO factor checks. If we get DUO Web, only add DUO Push if we don't get DUO Push, and likewise only add DUO Passcode if we don't get DUO Passcode. And while we are at it, do both of the above even if we do get other MFA options, like Webauthn. --- gimme_aws_creds/okta.py | 44 ++++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 241ae252..4d12c08e 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -780,17 +780,43 @@ def _choose_factor(self, factors): self.ui.info("Multi-factor Authentication required.") - # filter the factor list down to just the types specified in preferred_mfa_type - preferred_factors = [] - # even though duo supports both passcode and push, okta only lists web as an available factor. This if statement - # adds the additional supported factors only if the provider is duo, and the web factor is the only one provided - if len(factors) == 1 and factors[0].get('provider') == 'DUO' and factors[0].get('factorType') == 'web': - push = copy.deepcopy(factors[0]) + new_factors = [] + seen_duo_push = False + seen_duo_passcode = False + seen_duo_web = False + duo_web = None + + for factor in factors: + skip = False + if factor['provider'] == 'DUO': + if factor['factorType'] == 'passcode': + seen_duo_passcode = True + elif factor['factorType'] == 'push': + seen_duo_push = True + elif factor['factorType'] == 'web': + seen_duo_web = True + duo_web = factor + + if not skip: + new_factors.append(factor) + + # Even though duo supports both passcode and push, okta only lists web as an available factor. + + # If we received due web and not due push, add due push anyhow. + if seen_duo_web and not seen_duo_push: + push = copy.deepcopy(duo_web) push['factorType'] = "push" - factors.append(push) - passcode = copy.deepcopy(factors[0]) + new_factors.append(push) + + # If we received due web and not due push, add due passcode anyhow. + if seen_duo_web and not seen_duo_passcode: + passcode = copy.deepcopy(duo_web) passcode['factorType'] = "passcode" - factors.append(passcode) + new_factors.append(passcode) + + factors = new_factors + + # filter the factor list down to just the types specified in preferred_mfa_type if self._preferred_mfa_type is not None: preferred_factors = list(filter(lambda item: item['factorType'] == self._preferred_mfa_type, factors)) # If the preferred factor isn't in the list of available factors, we'll let the user know before From a9f6d232c2825d4407b24fa59dfa8214f83ebb55 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 05:15:28 -0700 Subject: [PATCH 24/58] Rework preferred factors slightly. Now, preferred factors just replaces the factors list outright if we have any matches. And additionally, we now only prompt for factor selection if we have more than 1 possible MFA factor. (After all, if we are only given a single option, there's no point in ever prompting.) --- gimme_aws_creds/okta.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 4d12c08e..6047620a 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -823,11 +823,14 @@ def _choose_factor(self, factors): # prompting to select another. if not preferred_factors: self.ui.notify('Preferred factor type of {} not available.'.format(self._preferred_mfa_type)) + else: + factors = preferred_factors - if len(preferred_factors) == 1: - factor_name = self._build_factor_name(preferred_factors[0]) + # Don't bother prompting if we only have a single factor option. + if len(factors) == 1: + factor_name = self._build_factor_name(factors[0]) self.ui.info(factor_name + ' selected') - selection = factors.index(preferred_factors[0]) + selection = 0 else: self.ui.info("Pick a factor:") # print out the factors and let the user select From 86c28d73168343d9cce21e1269be11e838876e26 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 05:17:13 -0700 Subject: [PATCH 25/58] Consider N Webauthn factors as a single option. Currently, if a user has 5 Webauthn keys configured in Okta (or, more likely, 2), the user gets 5 separate options in the factor list. This is... Not horribly user friendly, even if every single one does have a name configured for them. It turns out that Okta lets us ask for a nonce valid for every single one of them. If we take that path, we can take that nonce, and the whole list of possible challenge IDs, and pass it down to the webauthn code. At that point, we can let it pick whichever device is actually plugged in to this computer at this time. This means both that we can just give a single option in the factor list for Webauthn, and that setting the preferred MFA type to webauthn can Just Work even when a user has multiple webauthn tokens configured. This makes for a much better UI flow for users. The only big outstanding question I have is if the code around having names / labels for webauthn tokens makes any sense after this. --- gimme_aws_creds/okta.py | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 6047620a..dc4a7639 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -314,10 +314,15 @@ def _next_login_step(self, state_token, login_data): elif status == 'MFA_REQUIRED': return self._login_multi_factor(state_token, login_data) elif status == 'MFA_CHALLENGE': - if login_data['_embedded']['factor']['factorType'] == 'u2f': - return self._check_u2f_result(state_token, login_data) - if login_data['_embedded']['factor']['factorType'] == 'webauthn': + if 'factor' in login_data['_embedded']: + if login_data['_embedded']['factor']['factorType'] == 'u2f': + return self._check_u2f_result(state_token, login_data) + if login_data['_embedded']['factor']['factorType'] == 'webauthn': + return self._check_webauthn_result(state_token, login_data) + # This assumes that if there are multiple factors, they are all of the same type. + elif 'factors' in login_data['_embedded'] and login_data['_embedded']['factors'][0]['factorType'] == 'webauthn': return self._check_webauthn_result(state_token, login_data) + if 'factorResult' in login_data and login_data['factorResult'] == 'WAITING': return self._check_push_result(state_token, login_data) else: @@ -662,11 +667,19 @@ def _check_u2f_result(self, state_token, login_data): def _check_webauthn_result(self, state_token, login_data): """ wait for webauthN challenge """ - nonce = login_data['_embedded']['factor']['_embedded']['challenge']['challenge'] - credential_id = login_data['_embedded']['factor']['profile']['credentialId'] + cred_list = [] + nonce = '' + + if 'factor' in login_data['_embedded']: + cred_list.append(login_data['_embedded']['factor']['profile']['credentialId']) + nonce = login_data['_embedded']['factor']['_embedded']['challenge']['challenge'] + elif 'factors' in login_data['_embedded']: + for factor in login_data['_embedded']['factors']: + cred_list.append(factor['profile']['credentialId']) + nonce = login_data['_embedded']['challenge']['challenge'] """ Authenticator """ - webauthn_client = WebAuthnClient(self.ui, self._okta_org_url, nonce, credential_id) + webauthn_client = WebAuthnClient(self.ui, self._okta_org_url, nonce, cred_list) # noinspection PyBroadException try: client_data, assertion = webauthn_client.verify() @@ -781,6 +794,7 @@ def _choose_factor(self, factors): self.ui.info("Multi-factor Authentication required.") new_factors = [] + seen_webauthn = False seen_duo_push = False seen_duo_passcode = False seen_duo_web = False @@ -788,7 +802,17 @@ def _choose_factor(self, factors): for factor in factors: skip = False - if factor['provider'] == 'DUO': + # Collapse together multiple webauthn entries, so the user isn't + # asked to pick which webauthn key is plugged in. + if factor['provider'] == 'FIDO': + if factor['factorType'] == 'webauthn': + if not seen_webauthn: + seen_webauthn = True + factor['profile']['authenticatorName'] = 'Any configured key' + factor['_links']['verify']['href'] = self._okta_org_url + '/api/v1/authn/factors/webauthn/verify' + else: + skip = True + elif factor['provider'] == 'DUO': if factor['factorType'] == 'passcode': seen_duo_passcode = True elif factor['factorType'] == 'push': From b07a61d1ff5bde687d4d14528fc621b715101a11 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 06:38:32 -0700 Subject: [PATCH 26/58] fido2 1.0.0 support. This makes the changes required to support the 1.0.0 and above releases of fido2. The biggest two changes are in having to implement fido2.client.UserInteraction instead of just passing in the PIN and an on_keepalive function callback, and in having to upgrade to a version of ctap-keyring-device which also supports fido2 1.0.0. For ctap-keyring-device, we have the complication that while 2.0.0 supports fido2 1.0.0, it seems that the 2.0.0 release hasn't actually been, well, released yet. As such, we're quite improperly importing directly from git right now. We should fix that. --- gimme_aws_creds/webauthn.py | 54 ++++++++++++++++++++++++------------- requirements.txt | 7 +++-- tests/test_okta_client.py | 2 +- 3 files changed, 41 insertions(+), 22 deletions(-) diff --git a/gimme_aws_creds/webauthn.py b/gimme_aws_creds/webauthn.py index f6cdba5f..7060de1f 100644 --- a/gimme_aws_creds/webauthn.py +++ b/gimme_aws_creds/webauthn.py @@ -18,12 +18,14 @@ from ctap_keyring_device.ctap_keyring_device import CtapKeyringDevice from ctap_keyring_device.ctap_strucs import CtapOptions from fido2 import cose -from fido2.client import Fido2Client, ClientError +from fido2.client import Fido2Client, ClientError, UserInteraction +from fido2.ctap2.pin import ClientPin from fido2.hid import CtapHidDevice, STATUS from fido2.utils import websafe_decode from fido2.webauthn import PublicKeyCredentialCreationOptions, \ PublicKeyCredentialType, PublicKeyCredentialParameters, PublicKeyCredentialDescriptor, UserVerificationRequirement from fido2.webauthn import PublicKeyCredentialRequestOptions +from typing import Optional from gimme_aws_creds.errors import NoFIDODeviceFoundError, FIDODeviceTimeoutError @@ -34,6 +36,34 @@ def __init__(self): self.auth_data = b'fake' +class UI(UserInteraction): + def __init__(self, ui): + self.ui = ui + self._has_prompted = False + + def prompt_up(self) -> None: + if not self._has_prompted: + self.ui.info('\nTouch your authenticator device now...\n') + self._has_prompted = True + + def request_pin( + self, permissions: ClientPin.PERMISSION, rp_id: Optional[str] + ) -> Optional[str]: + """Called when the client requires a PIN from the user. + + Should return a PIN, or None/Empty to cancel.""" + return pwinput.pwinput("Please enter PIN: ") + + def request_uv( + self, permissions: ClientPin.PERMISSION, rp_id: Optional[str] + ) -> bool: + """Called when the client is about to request UV from the user. + + Should return True if allowed, or False to cancel.""" + self.ui.info("User Verification requested.") + return True + + class WebAuthnClient(object): def __init__(self, ui, okta_org_url, challenge, credential_id=None, timeout_ms=30_000): """ @@ -52,6 +82,7 @@ def __init__(self, ui, okta_org_url, challenge, credential_id=None, timeout_ms=3 self._client_data = None self._rp = {'id': okta_org_url[8:], 'name': okta_org_url[8:]} self._allow_list = [] + self.user_interaction = UI(ui=ui) if credential_id: if type(credential_id) is list: @@ -68,7 +99,7 @@ def locate_device(self): if not devs: devs = CtapKeyringDevice.list_devices() - self._clients = [Fido2Client(d, self._okta_org_url) for d in devs] + self._clients = [Fido2Client(device=d, origin=self._okta_org_url, user_interaction=self.user_interaction) for d in devs] def on_keepalive(self, status): if status == STATUS.UPNEEDED and not self._has_prompted: @@ -86,10 +117,7 @@ def _verify(self, client): allow_credentials=self._allow_list, timeout=self._timeout_ms, user_verification=user_verification) - pin = self._get_pin_from_client(client) - assertion_selection = client.get_assertion(options, event=self._event, - on_keepalive=self.on_keepalive, - pin=pin) + assertion_selection = client.get_assertion(options, event=self._event) self.ui.info('Processing...\n') self._assertions = assertion_selection.get_assertions() if len(self._assertions) < 0: @@ -119,10 +147,7 @@ def _make_credential(self, client, user): options = PublicKeyCredentialCreationOptions(self._rp, user, self._challenge, pub_key_cred_params, timeout=self._timeout_ms) - pin = self._get_pin_from_client(client) - attestation_res = client.make_credential(options, event=self._event, - on_keepalive=self.on_keepalive, - pin=pin) + attestation_res = client.make_credential(options, event=self._event) self._attestation, self._client_data = attestation_res.attestation_object, attestation_res.client_data self._event.set() @@ -148,15 +173,6 @@ def _run_in_thread(self, method, *args, **kwargs): self.ui.info('Operation timed out or no valid Security Key found !') raise FIDODeviceTimeoutError - @staticmethod - def _get_pin_from_client(client): - if not client.info.options.get(CtapOptions.CLIENT_PIN): - return None - - # Prompt for PIN if needed - pin = pwinput.pwinput("Please enter PIN: ") - return pin - @staticmethod def _get_user_verification_requirement_from_client(client): if not client.info.options.get(CtapOptions.USER_VERIFICATION): diff --git a/requirements.txt b/requirements.txt index 2c13b19c..2d3e7d01 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,10 @@ beautifulsoup4>=4.6.0,<5.0.0 configparser>=3.5.0,<4.0.0 keyring>=21.4.0 requests>=2.13.0,<3.0.0 -fido2>=0.9.1,<0.10.0 +fido2>=1.0.0,<2.0.0 okta>=2.0.0 -ctap-keyring-device>=1.0.6 +# ctap-keyring-device>=1.0.6 +# We need ctap-keyring-device>=2.0.0, but that hasn't been released yet. +# So we're grabbing the git release with the applicable fixes. +git+https://github.com/dany74q/ctap-keyring-device@de144921c6a0fb005422a6e3dc80966f01663e0a pwinput>=1.0.2 diff --git a/tests/test_okta_client.py b/tests/test_okta_client.py index a9f8f1fe..0df5869d 100644 --- a/tests/test_okta_client.py +++ b/tests/test_okta_client.py @@ -11,7 +11,7 @@ import requests import responses from fido2.attestation import PackedAttestation -from fido2.ctap2 import AttestationObject, AuthenticatorData, AttestedCredentialData +from fido2.webauthn import AttestationObject, AuthenticatorData, AttestedCredentialData from gimme_aws_creds import errors, ui from gimme_aws_creds.okta import OktaClient From f0e0dc025ceb1b9ff1a81415f512210ed6fd2a4b Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 06:52:23 -0700 Subject: [PATCH 27/58] Use defusedxml to silence security warnings. The linter quite rightfully complains that parsing untrusted XML is not something done lightly. --- gimme_aws_creds/aws.py | 2 +- requirements.txt | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/gimme_aws_creds/aws.py b/gimme_aws_creds/aws.py index 4e0160f3..2d679a8e 100644 --- a/gimme_aws_creds/aws.py +++ b/gimme_aws_creds/aws.py @@ -10,7 +10,7 @@ See the License for the specific language governing permissions and* limitations under the License.* """ import base64 -import xml.etree.ElementTree as ET +import defusedxml.ElementTree as ET import requests from bs4 import BeautifulSoup diff --git a/requirements.txt b/requirements.txt index 2d3e7d01..ea7083c8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,3 +10,4 @@ okta>=2.0.0 # So we're grabbing the git release with the applicable fixes. git+https://github.com/dany74q/ctap-keyring-device@de144921c6a0fb005422a6e3dc80966f01663e0a pwinput>=1.0.2 +defusedxml From 6787e7a7ca542fdae5f0655f86346829be083ee8 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 06:53:19 -0700 Subject: [PATCH 28/58] Use allowed_methods for Retry. This makes the depreciation warnings much happier. --- gimme_aws_creds/aws.py | 2 +- gimme_aws_creds/okta.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/gimme_aws_creds/aws.py b/gimme_aws_creds/aws.py index 2d679a8e..0805ac77 100644 --- a/gimme_aws_creds/aws.py +++ b/gimme_aws_creds/aws.py @@ -39,7 +39,7 @@ def __init__(self, verify_ssl_certs=True): # Allow up to 5 retries on requests to AWS in case we have network issues self._http_client = requests.Session() retries = Retry(total=5, backoff_factor=1, - method_whitelist=['POST']) + allowed_methods=['POST']) self._http_client.mount('https://', HTTPAdapter(max_retries=retries)) def get_signinpage(self, saml_token, saml_target_url): diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index dc4a7639..40cf56cb 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -81,7 +81,7 @@ def __init__(self, gac_ui, okta_org_url, verify_ssl_certs=True, device_token=Non self.device_token = device_token retries = Retry(total=5, backoff_factor=1, - method_whitelist=['GET', 'POST']) + allowed_methods=['GET', 'POST']) self._http_client.mount('https://', HTTPAdapter(max_retries=retries)) @property From e6f8eb400393acae895da1f5bfd06f65975dbedc Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 06:53:39 -0700 Subject: [PATCH 29/58] Remove some whitespace on empty lines. This makes the linter happier. --- gimme_aws_creds/aws.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/gimme_aws_creds/aws.py b/gimme_aws_creds/aws.py index 0805ac77..5ad954a4 100644 --- a/gimme_aws_creds/aws.py +++ b/gimme_aws_creds/aws.py @@ -48,7 +48,7 @@ def get_signinpage(self, saml_token, saml_target_url): 'SAMLResponse': saml_token, 'RelayState': '' } - + response = self._http_client.post( saml_target_url, data=payload, @@ -58,7 +58,7 @@ def get_signinpage(self, saml_token, saml_target_url): def _enumerate_saml_roles(self, assertion, saml_target_url): signin_page = self.get_signinpage(assertion, saml_target_url) - + """ using the assertion to fetch aws sign-in page, parse it and return aws sts creds """ role_pairs = [] root = ET.fromstring(base64.b64decode(assertion)) @@ -80,10 +80,10 @@ def _enumerate_saml_roles(self, assertion, saml_target_url): raise errors.GimmeAWSCredsError('Parsing error on {}'.format(role_pair)) else: table[role] = idp - + # init parser soup = BeautifulSoup(signin_page, 'html.parser') - + # find all roles roles = soup.find_all("div", attrs={"class": "saml-role"}) # Normalize pieces of string; @@ -120,7 +120,7 @@ def _display_role(roles): if not current_account == last_account: role_strs.append(current_account) last_account = current_account - + role_strs.append(' [ {} ]: {}'.format(i, role.friendly_role_name)) return role_strs From 4722d9ec22c0b85bb60a1cdf5c0f006955fdfbe1 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 07:22:06 -0700 Subject: [PATCH 30/58] Allow remember_device to work with webauthn. I can't see any reason why this wasn't previously allowed, we do it for all the other auth types. Now, I can think of reasons not to do it, but, well, yeah. --- gimme_aws_creds/okta.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 40cf56cb..7a7ef50a 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -692,7 +692,8 @@ def _check_webauthn_result(self, state_token, login_data): auth_data = base64.b64encode(assertion.auth_data).decode('utf-8') response = self._http_client.post( - login_data['_links']['next']['href'] + "?rememberDevice=false", + login_data['_links']['next']['href'], + params={'rememberDevice': self._remember_device}, json={'stateToken': state_token, 'clientData': client_data, 'signatureData': signature_data, 'authenticatorData': auth_data}, headers=self._get_headers(), From 4a6a99a7aadbb6a34e6fb8220a1d3f26f999022d Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 09:35:03 -0700 Subject: [PATCH 31/58] Cleanup whitespace for linters. --- gimme_aws_creds/main.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/gimme_aws_creds/main.py b/gimme_aws_creds/main.py index d55ac624..2d176084 100644 --- a/gimme_aws_creds/main.py +++ b/gimme_aws_creds/main.py @@ -505,7 +505,7 @@ def conf_dict(self): :rtype: dict """ # noinspection PyUnusedLocal - config = self.config + config = self.config # noqa return self._cache['conf_dict'] @property @@ -800,7 +800,7 @@ def _run(self): self.handle_action_store_json_creds() self.handle_action_list_roles() self.handle_setup_fido_authenticator() - + # for each data item, if we have an override on output, prioritize that # if we do not, prioritize writing credentials to file if that is in our # configuration. If we are not writing to a credentials file, use whatever @@ -837,7 +837,6 @@ def write_result_action(self, action, data): self.ui.result("export AWS_SECURITY_TOKEN=" + data['credentials']['aws_security_token']) - def handle_action_configure(self): # Create/Update config when configure arg set if not self.config.action_configure: @@ -875,7 +874,7 @@ def handle_action_register_device(self): self.ui.notify('*** You may be prompted for MFA more than once for this run.\n') auth_result = self.auth_session - base_config = self.config.get_config_dict(include_inherits = False) + base_config = self.config.get_config_dict(include_inherits=False) base_config['device_token'] = auth_result['device_token'] self.config.write_config_file(base_config) self.okta.device_token = base_config['device_token'] From 1a0d785dcebc3b7344d0c18cb1c22511cb0a51b9 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 09:39:50 -0700 Subject: [PATCH 32/58] Attempt to keep and use the same Okta session. This is a small code change, and a huge functionality change. And it is only supported in 'internal' and 'appurl' gimmie_creds_server modes. But with a successful authorization via Okta, we will now store the username and Okta session cookie in the config file as session_username and session_token. When we go to authorize, we load both from the config, and if the username is either unset, or it matches what is in session_username, we will attempt to renew the session cookie. If the Okta session is still valid, that will extend it by an Okta configured duration, and we then skip the entire authorization flow and just use that Okta session. If the session is not valid, we will get a 404 back, and we continue forward with the standard flow. There is a significant catch with this, if you want to change Okta usernames, you must specify the username on the command line. --- gimme_aws_creds/main.py | 10 ++++++++++ gimme_aws_creds/okta.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/gimme_aws_creds/main.py b/gimme_aws_creds/main.py index 2d176084..417b7a9c 100644 --- a/gimme_aws_creds/main.py +++ b/gimme_aws_creds/main.py @@ -562,6 +562,9 @@ def okta(self): else: okta.set_use_keyring(True) + if self.conf_dict.get('session_token') is not None and self.conf_dict.get('session_username') is not None: + okta.set_session_token(self.conf_dict.get('session_username'), self.conf_dict.get('session_token')) + return okta def get_resolver(self): @@ -581,6 +584,13 @@ def device_token(self): def set_auth_session(self, auth_session): self._cache['auth_session'] = auth_session + okta = self._cache['okta'] + base_config = self.config.get_config_dict(include_inherits=False) + base_config['session_username'] = auth_session['username'] + base_config['session_token'] = auth_session['session'] + self.config.write_config_file(base_config) + okta.set_session_token(base_config.get('session_username'), base_config.get('session_token')) + @property def auth_session(self): if 'auth_session' in self._cache: diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 7a7ef50a..6c7fc0c7 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -72,6 +72,9 @@ def __init__(self, gac_ui, okta_org_url, verify_ssl_certs=True, device_token=Non self._oauth_access_token = None self._oauth_id_token = None + self._session_username = None + self._session_token = None + self._jar = requests.cookies.RequestsCookieJar() # Allow up to 5 retries on requests to Okta in case we have network issues @@ -94,6 +97,13 @@ def device_token(self, device_token): match = re.search(r'^https://(.*)/?', self._okta_org_url) self._http_client.cookies.set('DT', device_token, domain=match.group(1), path='/') + def set_session_token(self, username, session): + if self._username is not None and self._username != username: + # If the username has changed, the session token is not valid. + return + self._session_username = username + self._session_token = session + def set_username(self, username): self._username = username @@ -164,6 +174,26 @@ def auth(self): def auth_session(self, **kwargs): """ Authenticate the user and return the Okta Session ID and username""" + if self._session_username is not None and self._session_token is not None: + match = re.search(r'^https://(.*)/?', self._okta_org_url) + self._http_client.cookies.set('sid', self._session_token, domain=match.group(1), path='/') + session_url = self._okta_org_url + '/api/v1/sessions/me/lifecycle/refresh' + response = self._http_client.post( + session_url, + headers=self._get_headers(), + verify=self._verify_ssl_certs, + allow_redirects=False + ) + if response.status_code == 200: + self.set_username(self._session_username) + return { + "username": self._session_username, + "session": self._session_token, + "device_token": self._http_client.cookies['DT'] + } + else: + self._http_client.cookies.clear(name='sid', domain=match.group(1), path='/') + login_response = self.auth() session_url = self._okta_org_url + '/login/sessionCookieRedirect' From 7c839b96b5727d0b9d17871a336205440d18de36 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 5 Oct 2022 10:21:47 -0700 Subject: [PATCH 33/58] Add an option to disable session persistence. For those cases where you don't want to persist things between invocations for some reason. --- README.md | 3 ++- gimme_aws_creds/config.py | 15 +++++++++++++++ gimme_aws_creds/main.py | 5 +++-- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 255d68e5..3a3cc16b 100644 --- a/README.md +++ b/README.md @@ -109,7 +109,7 @@ A configuration wizard will prompt you to enter the necessary configuration para - aws_default_duration = This is optional. Lifetime for temporary credentials, in seconds. Defaults to 1 hour (3600) - app_url - If using 'appurl' setting for gimme_creds_server, this sets the url to the aws application configured in Okta. It is typically something like - okta_username - use this username to authenticate -- preferred_mfa_type - automatically select a particular device when prompted for MFA: +- preferred_mfa_type - automatically select a particular device when prompted for MFA: - push - Okta Verify App push or DUO push (depends on okta supplied provider type) - token:software:totp - OTP using the Okta Verify App - token:hardware - OTP using hardware like Yubikey @@ -123,6 +123,7 @@ A configuration wizard will prompt you to enter the necessary configuration para - remember_device - y or n. If yes, the MFA device will be remembered by Okta service for a limited time. This option can also be set interactively in the command line using `-m` or `--remember-device` - output_format - `json` or `export`, determines default credential output format, can be also specified by `--output-format FORMAT` and `-o FORMAT`. - use_keyring - y or n. Defaults to y. If n, use of the system keyring for password storage is disabled. +- disable_session - y or n. Defaults to n. If y, disables using the session token between gimmie-aws-creds invocations. ## Configuration File diff --git a/gimme_aws_creds/config.py b/gimme_aws_creds/config.py index 7c7d28c1..3b31ec4e 100644 --- a/gimme_aws_creds/config.py +++ b/gimme_aws_creds/config.py @@ -226,6 +226,7 @@ def update_config_file(self): preferred_mfa_type = Select this MFA device type automatically include_path - (optional) includes that full role path to the role name for profile use_keyring - Enables or disables use of the system keyring + disable_session - Disables persistent sessions. """ config = configparser.ConfigParser() @@ -251,6 +252,7 @@ def update_config_file(self): 'device_token': '', 'output_format': 'export', 'use_keyring': 'y', + 'disable_session': 'n', } # See if a config file already exists. @@ -296,6 +298,7 @@ def update_config_file(self): config_dict['cred_profile'] = defaults['cred_profile'] config_dict['use_keyring'] = self.get_use_keyring(defaults['use_keyring']) + config_dict['disable_session'] = self.get_disable_session(defaults['disable_session']) self.write_config_file(config_dict) @@ -544,6 +547,18 @@ def _get_use_keyring(self, default_entry): except ValueError: ui.default.warning("Remember the value must be either y or n.") + def _get_disable_session(self, default_entry): + """Option to disable storing and using the session token between invocations.""" + ui.default.message( + "Do you want to disable storing and using the session token between invocations?\n" + "Please answer y or n.") + while True: + try: + return self._get_user_input_yes_no( + "Disable session token storage", default_entry) + except ValueError: + ui.default.warning("Remember the value must be either y or n.") + def _get_user_input(self, message, default=None): """formats message to include default and then prompts user for input via keyboard with message. Returns user's input or if user doesn't diff --git a/gimme_aws_creds/main.py b/gimme_aws_creds/main.py index 417b7a9c..783e26d8 100644 --- a/gimme_aws_creds/main.py +++ b/gimme_aws_creds/main.py @@ -562,8 +562,9 @@ def okta(self): else: okta.set_use_keyring(True) - if self.conf_dict.get('session_token') is not None and self.conf_dict.get('session_username') is not None: - okta.set_session_token(self.conf_dict.get('session_username'), self.conf_dict.get('session_token')) + if self.conf_dict.get('disable_session', 'False') not in ('n', 'false', 'False'): + if self.conf_dict.get('session_token') is not None and self.conf_dict.get('session_username') is not None: + okta.set_session_token(self.conf_dict.get('session_username'), self.conf_dict.get('session_token')) return okta From bdfdf99bc41f753f2c1b6fc618ef365ffa6bce68 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 7 Oct 2022 06:15:12 -0700 Subject: [PATCH 34/58] Fix some whitespace related linter warnings. --- gimme_aws_creds/config.py | 4 +-- gimme_aws_creds/u2f.py | 2 +- tests/test_aws_resolver.py | 63 +++++++++++++++++++------------------- 3 files changed, 34 insertions(+), 35 deletions(-) diff --git a/gimme_aws_creds/config.py b/gimme_aws_creds/config.py index 3b31ec4e..682a4e63 100644 --- a/gimme_aws_creds/config.py +++ b/gimme_aws_creds/config.py @@ -175,7 +175,7 @@ def get_args(self): self.roles = [role.strip() for role in args.roles.split(',') if role.strip()] self.conf_profile = args.profile or 'DEFAULT' - def _handle_config(self, config, profile_config, include_inherits = True): + def _handle_config(self, config, profile_config, include_inherits=True): if "inherits" in profile_config.keys() and include_inherits: self.ui.message("Using inherited config: " + profile_config["inherits"]) if profile_config["inherits"] not in config: @@ -189,7 +189,7 @@ def _handle_config(self, config, profile_config, include_inherits = True): else: return profile_config - def get_config_dict(self, include_inherits = True): + def get_config_dict(self, include_inherits=True): """returns the conf dict from the okta config file""" # Check to see if config file exists, if not complain and exit # If config file does exist return config dict from file diff --git a/gimme_aws_creds/u2f.py b/gimme_aws_creds/u2f.py index 75bf70ce..aa6df54e 100644 --- a/gimme_aws_creds/u2f.py +++ b/gimme_aws_creds/u2f.py @@ -61,7 +61,7 @@ def work(self, client): for _ in range(30): try: self._signature = client.authenticate( - self._nonce, self._appId, self._credentialId ) + self._nonce, self._appId, self._credentialId) except ApduError as e: if e.code == APDU.USE_NOT_SATISFIED: if not self._has_prompted: diff --git a/tests/test_aws_resolver.py b/tests/test_aws_resolver.py index 5fc70be7..9c8abb77 100644 --- a/tests/test_aws_resolver.py +++ b/tests/test_aws_resolver.py @@ -30,7 +30,7 @@ def setUp(self): """Set up for the unit tests""" self.resolver = self.setUp_client(False) - self.aws_signinpage = """ + self.aws_signinpage = r""" @@ -41,7 +41,7 @@ def setUp(self): - + - + Conditions d'utilisation Politique de confidentialité
- - + + - + @@ -2111,4 +2111,3 @@ def test_display_role(self): assert result[4] == self.display_role[4] assert result[5] == self.display_role[5] assert result[6] == self.display_role[6] - From fcb13b63f6c91e6dfd39962fca7a71b991cc619f Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 7 Oct 2022 06:47:46 -0700 Subject: [PATCH 35/58] Specify ctap-keyring-device differently. This works better with setuputils. Note: It is my understanding that pypi won't accept releases that point directly at URLs in this manner, so we really need a release of ctap-keyring-device 2.0. --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index ea7083c8..7048f7e0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,6 @@ okta>=2.0.0 # ctap-keyring-device>=1.0.6 # We need ctap-keyring-device>=2.0.0, but that hasn't been released yet. # So we're grabbing the git release with the applicable fixes. -git+https://github.com/dany74q/ctap-keyring-device@de144921c6a0fb005422a6e3dc80966f01663e0a +ctap-keyring-device @ git+https://github.com/dany74q/ctap-keyring-device@de144921c6a0fb005422a6e3dc80966f01663e0a pwinput>=1.0.2 defusedxml From 937d224b886417d4e8675bfb2c283c4737cdfd69 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Wed, 26 Oct 2022 10:03:09 -0700 Subject: [PATCH 36/58] Catch KeyError instead of AttributeError. This should work just a little better. --- gimme_aws_creds/okta.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 6c7fc0c7..6c7788dc 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -646,13 +646,13 @@ def _check_push_result(self, state_token, login_data): response_data = response.json() # We could check for the existence of every layer... - # Or we can just catch AttributeError and pass, with a nosec comment. + # Or we can just catch KeyError and pass, with a nosec comment. try: if response_data['_embedded']['factor']['_embedded']['challenge']['correctAnswer']: if self._print_correct_answer: self._print_correct_answer(response_data['_embedded']['factor']['_embedded']['challenge']['correctAnswer']) self._print_correct_answer = None - except AttributeError: # nosec + except KeyError: # nosec pass if 'stateToken' in response_data: From 6fc7b67db6b4059339398b8846011d928c5d75d9 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 28 Oct 2022 09:36:07 -0700 Subject: [PATCH 37/58] Correct the check for disable_session. Little things... --- gimme_aws_creds/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gimme_aws_creds/main.py b/gimme_aws_creds/main.py index 783e26d8..7c8772e1 100644 --- a/gimme_aws_creds/main.py +++ b/gimme_aws_creds/main.py @@ -562,7 +562,7 @@ def okta(self): else: okta.set_use_keyring(True) - if self.conf_dict.get('disable_session', 'False') not in ('n', 'false', 'False'): + if self.conf_dict.get('disable_session', 'False') not in ('y', 'true', 'True'): if self.conf_dict.get('session_token') is not None and self.conf_dict.get('session_username') is not None: okta.set_session_token(self.conf_dict.get('session_username'), self.conf_dict.get('session_token')) From 435920e15d7122ac4fedf8f051aaeb3f4e2af645 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 28 Oct 2022 09:37:37 -0700 Subject: [PATCH 38/58] Fix handling of the name for multiple keys. It worked for me up until the first device was one that was registered. --- gimme_aws_creds/okta.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 6c7788dc..3324a22b 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -938,6 +938,10 @@ def _build_factor_name(self, factor): elif factor['factorType'] == 'u2f': return factor['factorType'] + ": " + factor['factorType'] elif factor['factorType'] == 'webauthn': + default_factor_name = factor['profile'].get('authenticatorName') or factor['factorType'] + if default_factor_name == "Any configured key": + return factor['factorType'] + ": " + default_factor_name + factor_name = None try: registered_authenticators = RegisteredAuthenticators(self.ui) @@ -953,7 +957,6 @@ def _build_factor_name(self, factor): self.ui.info("Error getting authenticator name: {}".format(e)) pass - default_factor_name = factor['profile'].get('authenticatorName') or factor['factorType'] factor_name = factor_name or default_factor_name return factor['factorType'] + ": " + factor_name From eb82374ad27deee00d2a89494c972250e6ec97b0 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 28 Oct 2022 09:39:07 -0700 Subject: [PATCH 39/58] Add a new internal error. This will be used for the case where no eligible webauthn devices are found, even if webauthn devices are found. --- gimme_aws_creds/errors.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gimme_aws_creds/errors.py b/gimme_aws_creds/errors.py index 79cebaaf..b4c38d5c 100644 --- a/gimme_aws_creds/errors.py +++ b/gimme_aws_creds/errors.py @@ -70,6 +70,10 @@ class NoFIDODeviceFoundError(Exception): pass +class NoEligibleFIDODeviceFoundError(Exception): + pass + + class FIDODeviceTimeoutError(Exception): pass From d22fe3b2b0e6e90a4dba2ce2223205788de720a7 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 28 Oct 2022 09:40:57 -0700 Subject: [PATCH 40/58] Rework error handling around calls to webauthn. We now catch 'expected' errors from our webauthn code, cancel the operation with Okta, print an error message, and exit with a failure status. This isn't perfect, but it's definitely better. --- gimme_aws_creds/okta.py | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 3324a22b..b0d50433 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -31,9 +31,9 @@ from requests.adapters import HTTPAdapter, Retry from gimme_aws_creds.u2f import FactorU2F -from gimme_aws_creds.webauthn import WebAuthnClient, FakeAssertion +from gimme_aws_creds.webauthn import WebAuthnClient from . import errors, version, duo -from .errors import GimmeAWSCredsMFAEnrollStatus +from .errors import GimmeAWSCredsMFAEnrollStatus, NoFIDODeviceFoundError, FIDODeviceTimeoutError, GimmeAWSCredsExitError, FIDODeviceError, NoEligibleFIDODeviceFoundError from .registered_authenticators import RegisteredAuthenticators @@ -710,12 +710,16 @@ def _check_webauthn_result(self, state_token, login_data): """ Authenticator """ webauthn_client = WebAuthnClient(self.ui, self._okta_org_url, nonce, cred_list) - # noinspection PyBroadException try: client_data, assertion = webauthn_client.verify() - except Exception: - client_data = b'fake' - assertion = FakeAssertion() + except (NoFIDODeviceFoundError, FIDODeviceTimeoutError, FIDODeviceError, NoEligibleFIDODeviceFoundError) as e: + response = self._http_client.post( + login_data['_links']['cancel']['href'], + json={"stateToken": state_token}, + headers=self._get_headers(), verify=self._verify_ssl_certs) + response.raise_for_status() + + raise GimmeAWSCredsExitError(e) client_data = str(base64.urlsafe_b64encode(client_data), "utf-8") signature_data = base64.b64encode(assertion.signature).decode('utf-8') @@ -1083,7 +1087,17 @@ def _activate_webauthn_factor(self, state_token): user_obj = activation_obj.get('user', {}) webauthn_client = WebAuthnClient(self.ui, self._okta_org_url, challenge) - client_data_json, attestation = webauthn_client.make_credential(user_obj) + try: + client_data_json, attestation = webauthn_client.make_credential(user_obj) + except (NoFIDODeviceFoundError, FIDODeviceTimeoutError, FIDODeviceError, NoEligibleFIDODeviceFoundError) as e: + response = self._http_client.post( + response_json['_links']['cancel']['href'], + json={"stateToken": state_token}, + headers=self._get_headers(), verify=self._verify_ssl_certs) + response.raise_for_status() + + raise GimmeAWSCredsExitError(e) + client_data = str(base64.urlsafe_b64encode(client_data_json), 'utf-8') attestation_data = str(base64.urlsafe_b64encode(attestation), 'utf-8') From d012bf532fbec44b88adc6e8499a825f079baf9b Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 28 Oct 2022 09:43:25 -0700 Subject: [PATCH 41/58] Tweak the session username check. --- gimme_aws_creds/okta.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index b0d50433..3c3045b8 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -174,7 +174,7 @@ def auth(self): def auth_session(self, **kwargs): """ Authenticate the user and return the Okta Session ID and username""" - if self._session_username is not None and self._session_token is not None: + if self._session_username is not None and self._session_token is not None and self._session_username == self._username: match = re.search(r'^https://(.*)/?', self._okta_org_url) self._http_client.cookies.set('sid', self._session_token, domain=match.group(1), path='/') session_url = self._okta_org_url + '/api/v1/sessions/me/lifecycle/refresh' @@ -185,7 +185,6 @@ def auth_session(self, **kwargs): allow_redirects=False ) if response.status_code == 200: - self.set_username(self._session_username) return { "username": self._session_username, "session": self._session_token, From 15c6d34db17738e5b39e78ff07febcfd35d34a4f Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 28 Oct 2022 09:45:21 -0700 Subject: [PATCH 42/58] Drop the now unused FakeAssertion. --- gimme_aws_creds/webauthn.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/gimme_aws_creds/webauthn.py b/gimme_aws_creds/webauthn.py index 7060de1f..bc4c7717 100644 --- a/gimme_aws_creds/webauthn.py +++ b/gimme_aws_creds/webauthn.py @@ -30,12 +30,6 @@ from gimme_aws_creds.errors import NoFIDODeviceFoundError, FIDODeviceTimeoutError -class FakeAssertion(object): - def __init__(self): - self.signature = b'fake' - self.auth_data = b'fake' - - class UI(UserInteraction): def __init__(self, ui): self.ui = ui From b07e1e6d6f64320d1d2e4c9ac7b5e415a9229ffc Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 28 Oct 2022 09:48:42 -0700 Subject: [PATCH 43/58] Give different 'please touch' text per operation. After all, especially with session handling, it can be confusing on if we want you to touch your existing registered device, or a device to be registered. --- gimme_aws_creds/webauthn.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/gimme_aws_creds/webauthn.py b/gimme_aws_creds/webauthn.py index bc4c7717..d2ac7dcf 100644 --- a/gimme_aws_creds/webauthn.py +++ b/gimme_aws_creds/webauthn.py @@ -34,10 +34,14 @@ class UI(UserInteraction): def __init__(self, ui): self.ui = ui self._has_prompted = False + self._prompt_text = "" + + def set_prompt_text(self, text): + self._prompt_text = text def prompt_up(self) -> None: if not self._has_prompted: - self.ui.info('\nTouch your authenticator device now...\n') + self.ui.info('\nTouch your {} now...\n'.format(self._prompt_text)) self._has_prompted = True def request_pin( @@ -101,6 +105,7 @@ def on_keepalive(self, status): self._has_prompted = True def verify(self): + self.user_interaction.set_prompt_text("registered authentication device") self._run_in_thread(self._verify) return self._client_data, self._assertions[0] @@ -133,6 +138,7 @@ def _verify(self, client): return def make_credential(self, user): + self.user_interaction.set_prompt_text("new authentication device") self._run_in_thread(self._make_credential, user) return self._client_data, self._attestation.with_string_keys() From 2a6b617b16dede5b5807650b36a2f5c6930a345d Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 28 Oct 2022 09:50:00 -0700 Subject: [PATCH 44/58] Remove the dead on_keepalive. --- gimme_aws_creds/okta.py | 2 +- gimme_aws_creds/webauthn.py | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index 3c3045b8..f20941ec 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -1087,7 +1087,7 @@ def _activate_webauthn_factor(self, state_token): webauthn_client = WebAuthnClient(self.ui, self._okta_org_url, challenge) try: - client_data_json, attestation = webauthn_client.make_credential(user_obj) + client_data_json, attestation = webauthn_client.make_credential(activation_obj) except (NoFIDODeviceFoundError, FIDODeviceTimeoutError, FIDODeviceError, NoEligibleFIDODeviceFoundError) as e: response = self._http_client.post( response_json['_links']['cancel']['href'], diff --git a/gimme_aws_creds/webauthn.py b/gimme_aws_creds/webauthn.py index d2ac7dcf..97c6f694 100644 --- a/gimme_aws_creds/webauthn.py +++ b/gimme_aws_creds/webauthn.py @@ -99,11 +99,6 @@ def locate_device(self): self._clients = [Fido2Client(device=d, origin=self._okta_org_url, user_interaction=self.user_interaction) for d in devs] - def on_keepalive(self, status): - if status == STATUS.UPNEEDED and not self._has_prompted: - self.ui.info('\nTouch your authenticator device now...\n') - self._has_prompted = True - def verify(self): self.user_interaction.set_prompt_text("registered authentication device") self._run_in_thread(self._verify) From 6ad690da88234da7cf293122bc0560d5b4a46da0 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 28 Oct 2022 09:50:50 -0700 Subject: [PATCH 45/58] Rework keyring devices a little. Two things, first, we now always attempt to get the list of keyring devices, and if we are successful, we concat the two lists of devices. Second, we catch a known error in CtapKeyringDevice, and if we encounter it we print a message pointing at the PR which should fix the error. --- gimme_aws_creds/webauthn.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gimme_aws_creds/webauthn.py b/gimme_aws_creds/webauthn.py index 97c6f694..31300632 100644 --- a/gimme_aws_creds/webauthn.py +++ b/gimme_aws_creds/webauthn.py @@ -94,8 +94,10 @@ def __init__(self, ui, okta_org_url, challenge, credential_id=None, timeout_ms=3 def locate_device(self): # Locate a device devs = list(CtapHidDevice.list_devices()) - if not devs: - devs = CtapKeyringDevice.list_devices() + try: + devs += list(CtapKeyringDevice.list_devices()) + except (TypeError, ValueError): + self.ui.info("PR not yet merged, keyring devices will not be found. https://github.com/dany74q/ctap-keyring-device/pull/10") self._clients = [Fido2Client(device=d, origin=self._okta_org_url, user_interaction=self.user_interaction) for d in devs] From 394728d23cfd47bc33544aeaf09aaa044adfd0ce Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 28 Oct 2022 09:54:03 -0700 Subject: [PATCH 46/58] Rework _run_in_thread error handling. self.locate_device() never raises an error, so we need to check for _clients being empty ourselves, and if it is still empty after the second try, we raise the error ourselves. There is now a self._exception, which is a list of exceptions. Functions run in threads are expected to append any errors to this list. At the end, we check the list of errors. We default to FIDODeviceTimeoutError('Operation timed out') We then loop through the exception list, if we have instances of only FIDODeviceTimeoutError, we keep the default exception, if we find an instance of NoEligibleFIDODeviceFoundError, we switch to an error of NoEligibleFIDODeviceFoundError('No eligible authentication devices found.'), and if we have any other exception on the list, we pick that one instead over all of the others, wrapping it in a FIDODeviceError. We then throw whatever error we ended up with. These errors are now caught by the calling okta code. This should vastly improve the user experience around errors. --- gimme_aws_creds/webauthn.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/gimme_aws_creds/webauthn.py b/gimme_aws_creds/webauthn.py index 31300632..1fdc4aab 100644 --- a/gimme_aws_creds/webauthn.py +++ b/gimme_aws_creds/webauthn.py @@ -27,7 +27,7 @@ from fido2.webauthn import PublicKeyCredentialRequestOptions from typing import Optional -from gimme_aws_creds.errors import NoFIDODeviceFoundError, FIDODeviceTimeoutError +from gimme_aws_creds.errors import NoFIDODeviceFoundError, FIDODeviceTimeoutError, NoEligibleFIDODeviceFoundError, FIDODeviceError class UI(UserInteraction): @@ -80,6 +80,7 @@ def __init__(self, ui, okta_org_url, challenge, credential_id=None, timeout_ms=3 self._client_data = None self._rp = {'id': okta_org_url[8:], 'name': okta_org_url[8:]} self._allow_list = [] + self._exception = None self.user_interaction = UI(ui=ui) if credential_id: @@ -151,24 +152,36 @@ def _make_credential(self, client, user): def _run_in_thread(self, method, *args, **kwargs): # If authenticator is not found, prompt - try: - self.locate_device() - except NoFIDODeviceFoundError: + self.locate_device() + + if self._clients == []: self.ui.input('Please insert your security key and press enter...') self.locate_device() + if self._clients == []: + raise NoFIDODeviceFoundError("No authentication device found.") + + self._exception = [] threads = [] for client in self._clients: t = Thread(target=method, args=(client,) + args, kwargs=kwargs) + self.ui.info("Added client.") threads.append(t) t.start() for t in threads: t.join() - if not self._event.is_set(): - self.ui.info('Operation timed out or no valid Security Key found !') - raise FIDODeviceTimeoutError + main_error = FIDODeviceTimeoutError('Operation timed out') + + for e in self._exception: + if isinstance(e, NoEligibleFIDODeviceFoundError) and not isinstance(main_error, FIDODeviceError): + main_error = NoEligibleFIDODeviceFoundError('No eligible authentication devices found.') + elif not isinstance(e, FIDODeviceTimeoutError): + main_error = FIDODeviceError("Error: {}".format(e)) + self.ui.info("Error from Webauthn key: {}".format(e)) + + raise main_error @staticmethod def _get_user_verification_requirement_from_client(client): From 0e9af189ce23a27ee79adf3f23b386d6a425e8be Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 28 Oct 2022 10:00:37 -0700 Subject: [PATCH 47/58] Rewrite make_credential. With current versions of python-fido2, this needed a fair bit of attention, and while I was at it, I started actually paying attention to a lot more of what Okta is sending us in regards to parameters for the credentials. There is also a workaround for a python-fido2 bug here, with a comment that has a link to the bug report. --- gimme_aws_creds/webauthn.py | 72 ++++++++++++++++++++++++++++++------- 1 file changed, 60 insertions(+), 12 deletions(-) diff --git a/gimme_aws_creds/webauthn.py b/gimme_aws_creds/webauthn.py index 1fdc4aab..623aaabb 100644 --- a/gimme_aws_creds/webauthn.py +++ b/gimme_aws_creds/webauthn.py @@ -13,17 +13,19 @@ from __future__ import print_function, absolute_import, unicode_literals import pwinput +import humps + from threading import Event, Thread from ctap_keyring_device.ctap_keyring_device import CtapKeyringDevice from ctap_keyring_device.ctap_strucs import CtapOptions -from fido2 import cose from fido2.client import Fido2Client, ClientError, UserInteraction from fido2.ctap2.pin import ClientPin -from fido2.hid import CtapHidDevice, STATUS +from fido2.hid import CtapHidDevice from fido2.utils import websafe_decode from fido2.webauthn import PublicKeyCredentialCreationOptions, \ - PublicKeyCredentialType, PublicKeyCredentialParameters, PublicKeyCredentialDescriptor, UserVerificationRequirement + PublicKeyCredentialType, PublicKeyCredentialParameters, PublicKeyCredentialDescriptor, UserVerificationRequirement, \ + PublicKeyCredentialUserEntity, AuthenticatorSelectionCriteria from fido2.webauthn import PublicKeyCredentialRequestOptions from typing import Optional @@ -135,17 +137,63 @@ def _verify(self, client): else: return - def make_credential(self, user): + def make_credential(self, activation): self.user_interaction.set_prompt_text("new authentication device") - self._run_in_thread(self._make_credential, user) - return self._client_data, self._attestation.with_string_keys() - - def _make_credential(self, client, user): - pub_key_cred_params = [PublicKeyCredentialParameters(PublicKeyCredentialType.PUBLIC_KEY, cose.ES256.ALGORITHM)] - options = PublicKeyCredentialCreationOptions(self._rp, user, self._challenge, pub_key_cred_params, - timeout=self._timeout_ms) + self._run_in_thread(self._make_credential, activation) + return self._client_data, self._attestation + + def _make_credential(self, client, activation): + # Generate the list of acceptable key parameters from the Okta JSON. + pub_key_cred_params = [] + for param in activation['pubKeyCredParams']: + pub_key_cred_params.append(PublicKeyCredentialParameters(**param)) + + # Generate the list of excluded key IDs, if any, from the Okta JSON. + # This will generally include all currently enrolled keys. + exclude = [] + for id in activation['excludeCredentials']: + exclude.append(PublicKeyCredentialDescriptor(id['type'], websafe_decode(id['id']))) + + # Generate the user entity from the Okta JSON. + # Note: + # display_name has a different name in the JSON, I'm not sure if it can be absent entirely. + # id must be converted to bytes, not passed directly, otherwise things will fail. + user_json = humps.decamelize(activation['user']) + user_json['id'] = bytes(user_json['id'], encoding='utf-8') + user = PublicKeyCredentialUserEntity(**user_json) + + # Generate the authn selection from the Okta JSON, decamelized. + selection_json = humps.decamelize(activation['authenticatorSelection']) + # Work around https://github.com/Yubico/python-fido2/issues/162 + if not (client.info.options.get("uv") or client.info.options.get("pinUvAuthToken")): + selection_json['user_verification'] = None + + auth_selection = AuthenticatorSelectionCriteria(**selection_json) + + # And build the public key credential creation options. + options = PublicKeyCredentialCreationOptions( + rp=self._rp, + user=user, + challenge=self._challenge, + pub_key_cred_params=activation['pubKeyCredParams'], + timeout=self._timeout_ms, + exclude_credentials=exclude, + attestation=activation['attestation'], + authenticator_selection=auth_selection) - attestation_res = client.make_credential(options, event=self._event) + try: + attestation_res = client.make_credential(options, event=self._event) + except ClientError as e: + if e.code == ClientError.ERR.DEVICE_INELIGIBLE: + self.ui.info('Security key is ineligible: {}\n'.format(e.cause)) + self._exception.append(NoEligibleFIDODeviceFoundError("Security key is ineligible: {}".format(e.cause))) + return + elif e.code == ClientError.ERR.TIMEOUT: + return + else: + self.ui.info("Exception: {}\n".format(e)) + self._exception.append(e) + return self._attestation, self._client_data = attestation_res.attestation_object, attestation_res.client_data self._event.set() From d6c4a574f59748629dc1f4f597fb5b03334461d2 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 28 Oct 2022 10:07:10 -0700 Subject: [PATCH 48/58] Move _verify to the new exception handling. --- gimme_aws_creds/webauthn.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/gimme_aws_creds/webauthn.py b/gimme_aws_creds/webauthn.py index 623aaabb..dee769e8 100644 --- a/gimme_aws_creds/webauthn.py +++ b/gimme_aws_creds/webauthn.py @@ -120,19 +120,21 @@ def _verify(self, client): self.ui.info('Processing...\n') self._assertions = assertion_selection.get_assertions() if len(self._assertions) < 0: - self.ui.info('No assertions from key.') - raise + self._exception.append(RuntimeError("No assertions from key.")) + return assertion_res = assertion_selection.get_response(0) self._client_data = assertion_res.client_data self._event.set() except ClientError as e: if e.code == ClientError.ERR.DEVICE_INELIGIBLE: - self.ui.info('Security key is ineligible') # TODO extract key info + self.ui.info('Security key is ineligible: {}\n'.format(e.cause)) + self._exception.append(NoEligibleFIDODeviceFoundError("Security key is ineligible: {}".format(e.cause))) return elif e.code != ClientError.ERR.TIMEOUT: - raise + self._exception.append(e) + return else: return From 9e10e53b5faedb786e5fc23a90b1e5331893ce6c Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Fri, 28 Oct 2022 10:07:24 -0700 Subject: [PATCH 49/58] Remove an unneeded error print. --- gimme_aws_creds/webauthn.py | 1 - 1 file changed, 1 deletion(-) diff --git a/gimme_aws_creds/webauthn.py b/gimme_aws_creds/webauthn.py index dee769e8..b39ce048 100644 --- a/gimme_aws_creds/webauthn.py +++ b/gimme_aws_creds/webauthn.py @@ -193,7 +193,6 @@ def _make_credential(self, client, activation): elif e.code == ClientError.ERR.TIMEOUT: return else: - self.ui.info("Exception: {}\n".format(e)) self._exception.append(e) return From e5ff5818375a301d9726d5c147535a830e286918 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Mon, 28 Nov 2022 09:28:57 -0800 Subject: [PATCH 50/58] Remove spaces to maybe fix nix. --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 7048f7e0..fd0538c4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,6 @@ okta>=2.0.0 # ctap-keyring-device>=1.0.6 # We need ctap-keyring-device>=2.0.0, but that hasn't been released yet. # So we're grabbing the git release with the applicable fixes. -ctap-keyring-device @ git+https://github.com/dany74q/ctap-keyring-device@de144921c6a0fb005422a6e3dc80966f01663e0a +ctap-keyring-device@git+https://github.com/dany74q/ctap-keyring-device@de144921c6a0fb005422a6e3dc80966f01663e0a pwinput>=1.0.2 defusedxml From 062c506b40e1a95351cfe7ec7eee3a2efd271973 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Mon, 28 Nov 2022 10:53:35 -0800 Subject: [PATCH 51/58] Try installing git before running nix build. --- .github/workflows/build_nix.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/build_nix.yml b/.github/workflows/build_nix.yml index d7fad469..0460b65a 100644 --- a/.github/workflows/build_nix.yml +++ b/.github/workflows/build_nix.yml @@ -12,5 +12,7 @@ jobs: with: extra_nix_config: | access-tokens = github.com=${{ secrets.GITHUB_TOKEN }} + - name: Install git + run: apt-get -y install git - name: Building package run: nix build . From b670eae4c375209b54fbfb734ad9cb025d017d44 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Mon, 28 Nov 2022 10:53:55 -0800 Subject: [PATCH 52/58] Drop python 3.6, add 3.x. --- .github/workflows/cicd.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 50be0880..ff6207da 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -14,7 +14,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [ 3.6, 3.7, 3.8, 3.9 ] + python-version: [ 3.7, 3.8, 3.9, 3.x ] steps: - uses: actions/checkout@v2 From 1c98e83527ac3b8122b4f5d34d44c81f43156198 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Mon, 28 Nov 2022 10:58:07 -0800 Subject: [PATCH 53/58] Try to run apt-get as root. --- .github/workflows/build_nix.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build_nix.yml b/.github/workflows/build_nix.yml index 0460b65a..b31b8d3b 100644 --- a/.github/workflows/build_nix.yml +++ b/.github/workflows/build_nix.yml @@ -7,12 +7,12 @@ jobs: build: runs-on: ubuntu-latest steps: + - name: Install git + run: sudo apt-get -y install git - uses: actions/checkout@v3 - uses: cachix/install-nix-action@v17 with: extra_nix_config: | access-tokens = github.com=${{ secrets.GITHUB_TOKEN }} - - name: Install git - run: apt-get -y install git - name: Building package run: nix build . From f89b7f62d73475aa190c6c1307377b6ccf8e8fc3 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Mon, 28 Nov 2022 11:02:15 -0800 Subject: [PATCH 54/58] Install git here too. --- .github/workflows/cicd.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index ff6207da..70611496 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -22,6 +22,8 @@ jobs: uses: actions/setup-python@v2 with: python-version: ${{ matrix.python-version }} + - name: Install git + run: sudo apt-get -y install git - name: Install dependencies run: | python -m pip install --upgrade pip From 7860269aeacb6c9ed29f8a7d3d6429e4d9196f4a Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Mon, 28 Nov 2022 11:13:00 -0800 Subject: [PATCH 55/58] Install git in the Dockerfile. --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 2ac3c6c8..2769a8eb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,7 @@ COPY . . RUN apk --update add libgcc -ENV PACKAGES="gcc musl-dev python3-dev libffi-dev openssl-dev cargo" +ENV PACKAGES="gcc musl-dev python3-dev libffi-dev openssl-dev cargo git" RUN apk --update add $PACKAGES RUN pip install --upgrade pip setuptools-rust From 2abaac6ad962565361b40f9891a3b688ab5c7b11 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Mon, 28 Nov 2022 11:13:17 -0800 Subject: [PATCH 56/58] Add humps to requirements.txt --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index fd0538c4..ff9a2133 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ okta>=2.0.0 ctap-keyring-device@git+https://github.com/dany74q/ctap-keyring-device@de144921c6a0fb005422a6e3dc80966f01663e0a pwinput>=1.0.2 defusedxml +humps From 83f940d2c23ec88268776f9383db6806c24dbeff Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Tue, 29 Nov 2022 14:24:17 -0800 Subject: [PATCH 57/58] Handle the case where we have no 'sid' cookie. --- gimme_aws_creds/okta.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gimme_aws_creds/okta.py b/gimme_aws_creds/okta.py index f20941ec..a69e8601 100644 --- a/gimme_aws_creds/okta.py +++ b/gimme_aws_creds/okta.py @@ -191,7 +191,10 @@ def auth_session(self, **kwargs): "device_token": self._http_client.cookies['DT'] } else: - self._http_client.cookies.clear(name='sid', domain=match.group(1), path='/') + try: + self._http_client.cookies.clear(name='sid', domain=match.group(1), path='/') + except KeyError: + pass login_response = self.auth() From f417d5665caed0cf792f4278e3afcf81a83a1438 Mon Sep 17 00:00:00 2001 From: Zeph / Liz Loss-Cutler-Hull Date: Thu, 15 Dec 2022 00:06:26 -0800 Subject: [PATCH 58/58] Add an explicit requirement on urllib3 1.26+. --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index ff9a2133..5630ca7a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ beautifulsoup4>=4.6.0,<5.0.0 configparser>=3.5.0,<4.0.0 keyring>=21.4.0 requests>=2.13.0,<3.0.0 +urllib3>=1.26 fido2>=1.0.0,<2.0.0 okta>=2.0.0 # ctap-keyring-device>=1.0.6