diff --git a/edx_rest_framework_extensions/auth/jwt/authentication.py b/edx_rest_framework_extensions/auth/jwt/authentication.py index 1a8d846a..bc892c86 100644 --- a/edx_rest_framework_extensions/auth/jwt/authentication.py +++ b/edx_rest_framework_extensions/auth/jwt/authentication.py @@ -107,7 +107,7 @@ def authenticate(self, request): # CSRF passed validation with authenticated user set_custom_attribute('jwt_auth_result', 'success-cookie') # adds additional monitoring for mismatches - self._is_jwt_cookie_and_session_user_mismatched(request, jwt_user=user_and_auth[0]) + self._monitor_successful_jwt_cookie_and_session_user_mismatch(request, jwt_user_id=user_and_auth[0].id) return user_and_auth except Exception as exception: @@ -122,7 +122,7 @@ def authenticate(self, request): if is_authenticating_with_jwt_cookie: # This check also adds monitoring details for all failed JWT cookies - is_user_mismatch = self._is_jwt_cookie_and_session_user_mismatched(request) + is_user_mismatch = self._is_failed_jwt_cookie_and_session_user_mismatch(request) if is_forgiving_jwt_cookies_enabled: if is_user_mismatch: set_custom_attribute('jwt_auth_result', 'user-mismatch-failure') @@ -229,15 +229,49 @@ def is_authenticating_with_jwt_cookie(cls, request): except Exception: # pylint: disable=broad-exception-caught return False - def _is_jwt_cookie_and_session_user_mismatched(self, request, jwt_user=None): + def _is_failed_jwt_cookie_and_session_user_mismatch(self, request): + """ + Returns True if failed JWT cookie and session user do not match, False otherwise. + + Notes: + - Must only be called in the case of a JWT cookie failure. + - Also provides monitoring details for mismatches. + """ + try: + cookie_token = JSONWebTokenAuthentication.get_token_from_cookies(request.COOKIES) + invalid_decoded_jwt = unsafe_jwt_decode_handler(cookie_token) + jwt_user_id = invalid_decoded_jwt.get('user_id', None) + jwt_user_id_attribute_value = jwt_user_id if jwt_user_id else 'not-found' # pragma: no cover + except Exception: # pylint: disable=broad-exception-caught + jwt_user_id = None + jwt_user_id_attribute_value = 'decode-error' + + # .. custom_attribute_name: failed_jwt_cookie_user_id + # .. custom_attribute_description: The user_id pulled from the failed + # JWT cookie. If the user_id claim is not found in the JWT, the attribute + # value will be 'not-found'. If the failed JWT simply can't be decoded, + # the attribute value will be 'decode-error'. + set_custom_attribute('failed_jwt_cookie_user_id', jwt_user_id_attribute_value) + + return self._is_jwt_cookie_and_session_user_mismatch(request, jwt_user_id) + + def _monitor_successful_jwt_cookie_and_session_user_mismatch(self, request, jwt_user_id): + """ + Provides monitoring when a successful JWT cookie and session user do not match. + + Notes: + - Must only be called in the case of a successful JWT cookie. + - Also provides monitoring details for mismatches. + """ + self._is_jwt_cookie_and_session_user_mismatch(request, jwt_user_id) + + def _is_jwt_cookie_and_session_user_mismatch(self, request, jwt_user_id): """ Returns True if JWT cookie and session user do not match, False otherwise. Arguments: request: The request. - jwt_user (User): The valid JWT user. If not user is supplied, it is assumed that - the cookie was invalid, and we attempt to get the user_id from the invalid - token. + jwt_user_id (int): The user_id of the JWT, None if not found. Other notes: - If ENABLE_JWT_VS_SESSION_USER_CHECK is toggled off, always return False. @@ -270,17 +304,6 @@ def _is_jwt_cookie_and_session_user_mismatched(self, request, jwt_user=None): else: session_user_id = None - if jwt_user: - jwt_user_id = jwt_user.id - else: - cookie_token = JSONWebTokenAuthentication.get_token_from_cookies(request.COOKIES) - invalid_decoded_jwt = unsafe_jwt_decode_handler(cookie_token) - jwt_user_id = invalid_decoded_jwt['user_id'] - # .. custom_attribute_name: invalid_jwt_cookie_user_id - # .. custom_attribute_description: The user_id pulled from the invalid/failed - # JWT cookie. - set_custom_attribute('invalid_jwt_cookie_user_id', jwt_user_id) - if not session_user_id or session_user_id == jwt_user_id: return False diff --git a/edx_rest_framework_extensions/auth/jwt/tests/test_authentication.py b/edx_rest_framework_extensions/auth/jwt/tests/test_authentication.py index 45548466..061077a3 100644 --- a/edx_rest_framework_extensions/auth/jwt/tests/test_authentication.py +++ b/edx_rest_framework_extensions/auth/jwt/tests/test_authentication.py @@ -342,14 +342,48 @@ def test_authenticate_jwt_and_session_mismatch(self, mock_set_custom_attribute): assert response.status_code == 200 @mock.patch('edx_rest_framework_extensions.auth.jwt.authentication.set_custom_attribute') - def test_authenticate_jwt_and_session_mismatch_invalid_cookie(self, mock_set_custom_attribute): - """ Tests monitoring for JWT cookie when there is a session user mismatch and invalid JWT cookie """ + def test_authenticate_jwt_and_session_mismatch_bad_signature_cookie(self, mock_set_custom_attribute): + """ Tests monitoring for JWT cookie with a bad signature when there is a session user mismatch """ session_user_id = 111 session_user = factories.UserFactory(id=session_user_id) jwt_user_id = 222 jwt_user = factories.UserFactory(id=jwt_user_id) self.client.cookies = SimpleCookie({ - jwt_cookie_name(): self._get_test_jwt_token(user=jwt_user, is_valid=False), + jwt_cookie_name(): self._get_test_jwt_token(user=jwt_user, is_valid_signature=False), + }) + + enable_forgiving_jwt_cookies = get_setting(ENABLE_FORGIVING_JWT_COOKIES) + with override_settings( + EDX_DRF_EXTENSIONS={ + ENABLE_FORGIVING_JWT_COOKIES: enable_forgiving_jwt_cookies, + ENABLE_JWT_VS_SESSION_USER_CHECK: True, + }, + MIDDLEWARE=( + 'django.contrib.sessions.middleware.SessionMiddleware', + 'django.contrib.auth.middleware.AuthenticationMiddleware', + ), + ROOT_URLCONF='edx_rest_framework_extensions.auth.jwt.tests.test_authentication', + ): + self.client.force_login(session_user) + response = self.client.get(reverse('authenticated-view')) + + mock_set_custom_attribute.assert_any_call('is_jwt_vs_session_user_check_enabled', True) + mock_set_custom_attribute.assert_any_call('jwt_auth_session_user_id', session_user_id) + mock_set_custom_attribute.assert_any_call('jwt_auth_and_session_user_mismatch', True) + mock_set_custom_attribute.assert_any_call('failed_jwt_cookie_user_id', jwt_user_id) + if enable_forgiving_jwt_cookies: + mock_set_custom_attribute.assert_any_call('jwt_auth_result', 'user-mismatch-failure') + else: + mock_set_custom_attribute.assert_any_call('jwt_auth_result', 'failed-cookie') + assert response.status_code == 401 + + @mock.patch('edx_rest_framework_extensions.auth.jwt.authentication.set_custom_attribute') + def test_authenticate_jwt_and_session_mismatch_invalid_cookie(self, mock_set_custom_attribute): + """ Tests monitoring for invalid JWT cookie when there is a session user mismatch """ + session_user_id = 111 + session_user = factories.UserFactory(id=session_user_id) + self.client.cookies = SimpleCookie({ + jwt_cookie_name(): 'invalid-cookie', }) enable_forgiving_jwt_cookies = get_setting(ENABLE_FORGIVING_JWT_COOKIES) @@ -370,7 +404,7 @@ def test_authenticate_jwt_and_session_mismatch_invalid_cookie(self, mock_set_cus mock_set_custom_attribute.assert_any_call('is_jwt_vs_session_user_check_enabled', True) mock_set_custom_attribute.assert_any_call('jwt_auth_session_user_id', session_user_id) mock_set_custom_attribute.assert_any_call('jwt_auth_and_session_user_mismatch', True) - mock_set_custom_attribute.assert_any_call('invalid_jwt_cookie_user_id', jwt_user_id) + mock_set_custom_attribute.assert_any_call('failed_jwt_cookie_user_id', 'decode-error') if enable_forgiving_jwt_cookies: mock_set_custom_attribute.assert_any_call('jwt_auth_result', 'user-mismatch-failure') else: @@ -456,11 +490,11 @@ def test_authenticate_jwt_and_session_mismatch_disabled(self, mock_set_custom_at assert 'jwt_auth_and_session_user_mismatch' not in set_custom_attribute_keys assert response.status_code == 200 - def _get_test_jwt_token(self, user=None, is_valid=True): + def _get_test_jwt_token(self, user=None, is_valid_signature=True): """ Returns a user and jwt token """ test_user = factories.UserFactory() if user is None else user payload = generate_latest_version_payload(test_user) - if is_valid: + if is_valid_signature: jwt_token = generate_jwt_token(payload) else: jwt_token = generate_jwt_token(payload, signing_key='invalid-key')