Skip to content

Commit

Permalink
fixup! refactor for review feedback
Browse files Browse the repository at this point in the history
- add separate methods to find mismatch with successful and
 failed JWT, to make differences more clear.
- add additional values for ``failed_jwt_cookie_user_id`` in
 the case that the user_id is not found, or the JWT can't be
 decoded.
- typo fix.
  • Loading branch information
robrap committed Oct 5, 2023
1 parent d41ac06 commit f21c62d
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 23 deletions.
57 changes: 40 additions & 17 deletions edx_rest_framework_extensions/auth/jwt/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def authenticate(self, request):
# CSRF passed validation with authenticated user
set_custom_attribute('jwt_auth_result', 'success-cookie')
# adds additional monitoring for mismatches
self._is_jwt_cookie_and_session_user_mismatched(request, jwt_user=user_and_auth[0])
self._monitor_successful_jwt_cookie_and_session_user_mismatch(request, jwt_user_id=user_and_auth[0].id)
return user_and_auth

except Exception as exception:
Expand All @@ -122,7 +122,7 @@ def authenticate(self, request):

if is_authenticating_with_jwt_cookie:
# This check also adds monitoring details for all failed JWT cookies
is_user_mismatch = self._is_jwt_cookie_and_session_user_mismatched(request)
is_user_mismatch = self._is_failed_jwt_cookie_and_session_user_mismatch(request)
if is_forgiving_jwt_cookies_enabled:
if is_user_mismatch:
set_custom_attribute('jwt_auth_result', 'user-mismatch-failure')
Expand Down Expand Up @@ -229,15 +229,49 @@ def is_authenticating_with_jwt_cookie(cls, request):
except Exception: # pylint: disable=broad-exception-caught
return False

def _is_jwt_cookie_and_session_user_mismatched(self, request, jwt_user=None):
def _is_failed_jwt_cookie_and_session_user_mismatch(self, request):
"""
Returns True if failed JWT cookie and session user do not match, False otherwise.
Notes:
- Must only be called in the case of a JWT cookie failure.
- Also provides monitoring details for mismatches.
"""
try:
cookie_token = JSONWebTokenAuthentication.get_token_from_cookies(request.COOKIES)
invalid_decoded_jwt = unsafe_jwt_decode_handler(cookie_token)
jwt_user_id = invalid_decoded_jwt.get('user_id', None)
jwt_user_id_attribute_value = jwt_user_id if jwt_user_id else 'not-found' # pragma: no cover
except Exception: # pylint: disable=broad-exception-caught
jwt_user_id = None
jwt_user_id_attribute_value = 'decode-error'

# .. custom_attribute_name: failed_jwt_cookie_user_id
# .. custom_attribute_description: The user_id pulled from the failed
# JWT cookie. If the user_id claim is not found in the JWT, the attribute
# value will be 'not-found'. If the failed JWT simply can't be decoded,
# the attribute value will be 'decode-error'.
set_custom_attribute('failed_jwt_cookie_user_id', jwt_user_id_attribute_value)

return self._is_jwt_cookie_and_session_user_mismatch(request, jwt_user_id)

def _monitor_successful_jwt_cookie_and_session_user_mismatch(self, request, jwt_user_id):
"""
Provides monitoring when a successful JWT cookie and session user do not match.
Notes:
- Must only be called in the case of a successful JWT cookie.
- Also provides monitoring details for mismatches.
"""
self._is_jwt_cookie_and_session_user_mismatch(request, jwt_user_id)

def _is_jwt_cookie_and_session_user_mismatch(self, request, jwt_user_id):
"""
Returns True if JWT cookie and session user do not match, False otherwise.
Arguments:
request: The request.
jwt_user (User): The valid JWT user. If not user is supplied, it is assumed that
the cookie was invalid, and we attempt to get the user_id from the invalid
token.
jwt_user_id (int): The user_id of the JWT, None if not found.
Other notes:
- If ENABLE_JWT_VS_SESSION_USER_CHECK is toggled off, always return False.
Expand Down Expand Up @@ -270,17 +304,6 @@ def _is_jwt_cookie_and_session_user_mismatched(self, request, jwt_user=None):
else:
session_user_id = None

if jwt_user:
jwt_user_id = jwt_user.id
else:
cookie_token = JSONWebTokenAuthentication.get_token_from_cookies(request.COOKIES)
invalid_decoded_jwt = unsafe_jwt_decode_handler(cookie_token)
jwt_user_id = invalid_decoded_jwt['user_id']
# .. custom_attribute_name: invalid_jwt_cookie_user_id
# .. custom_attribute_description: The user_id pulled from the invalid/failed
# JWT cookie.
set_custom_attribute('invalid_jwt_cookie_user_id', jwt_user_id)

if not session_user_id or session_user_id == jwt_user_id:
return False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,48 @@ def test_authenticate_jwt_and_session_mismatch(self, mock_set_custom_attribute):
assert response.status_code == 200

@mock.patch('edx_rest_framework_extensions.auth.jwt.authentication.set_custom_attribute')
def test_authenticate_jwt_and_session_mismatch_invalid_cookie(self, mock_set_custom_attribute):
""" Tests monitoring for JWT cookie when there is a session user mismatch and invalid JWT cookie """
def test_authenticate_jwt_and_session_mismatch_bad_signature_cookie(self, mock_set_custom_attribute):
""" Tests monitoring for JWT cookie with a bad signature when there is a session user mismatch """
session_user_id = 111
session_user = factories.UserFactory(id=session_user_id)
jwt_user_id = 222
jwt_user = factories.UserFactory(id=jwt_user_id)
self.client.cookies = SimpleCookie({
jwt_cookie_name(): self._get_test_jwt_token(user=jwt_user, is_valid=False),
jwt_cookie_name(): self._get_test_jwt_token(user=jwt_user, is_valid_signature=False),
})

enable_forgiving_jwt_cookies = get_setting(ENABLE_FORGIVING_JWT_COOKIES)
with override_settings(
EDX_DRF_EXTENSIONS={
ENABLE_FORGIVING_JWT_COOKIES: enable_forgiving_jwt_cookies,
ENABLE_JWT_VS_SESSION_USER_CHECK: True,
},
MIDDLEWARE=(
'django.contrib.sessions.middleware.SessionMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
),
ROOT_URLCONF='edx_rest_framework_extensions.auth.jwt.tests.test_authentication',
):
self.client.force_login(session_user)
response = self.client.get(reverse('authenticated-view'))

mock_set_custom_attribute.assert_any_call('is_jwt_vs_session_user_check_enabled', True)
mock_set_custom_attribute.assert_any_call('jwt_auth_session_user_id', session_user_id)
mock_set_custom_attribute.assert_any_call('jwt_auth_and_session_user_mismatch', True)
mock_set_custom_attribute.assert_any_call('failed_jwt_cookie_user_id', jwt_user_id)
if enable_forgiving_jwt_cookies:
mock_set_custom_attribute.assert_any_call('jwt_auth_result', 'user-mismatch-failure')
else:
mock_set_custom_attribute.assert_any_call('jwt_auth_result', 'failed-cookie')
assert response.status_code == 401

@mock.patch('edx_rest_framework_extensions.auth.jwt.authentication.set_custom_attribute')
def test_authenticate_jwt_and_session_mismatch_invalid_cookie(self, mock_set_custom_attribute):
""" Tests monitoring for invalid JWT cookie when there is a session user mismatch """
session_user_id = 111
session_user = factories.UserFactory(id=session_user_id)
self.client.cookies = SimpleCookie({
jwt_cookie_name(): 'invalid-cookie',
})

enable_forgiving_jwt_cookies = get_setting(ENABLE_FORGIVING_JWT_COOKIES)
Expand All @@ -370,7 +404,7 @@ def test_authenticate_jwt_and_session_mismatch_invalid_cookie(self, mock_set_cus
mock_set_custom_attribute.assert_any_call('is_jwt_vs_session_user_check_enabled', True)
mock_set_custom_attribute.assert_any_call('jwt_auth_session_user_id', session_user_id)
mock_set_custom_attribute.assert_any_call('jwt_auth_and_session_user_mismatch', True)
mock_set_custom_attribute.assert_any_call('invalid_jwt_cookie_user_id', jwt_user_id)
mock_set_custom_attribute.assert_any_call('failed_jwt_cookie_user_id', 'decode-error')
if enable_forgiving_jwt_cookies:
mock_set_custom_attribute.assert_any_call('jwt_auth_result', 'user-mismatch-failure')
else:
Expand Down Expand Up @@ -456,11 +490,11 @@ def test_authenticate_jwt_and_session_mismatch_disabled(self, mock_set_custom_at
assert 'jwt_auth_and_session_user_mismatch' not in set_custom_attribute_keys
assert response.status_code == 200

def _get_test_jwt_token(self, user=None, is_valid=True):
def _get_test_jwt_token(self, user=None, is_valid_signature=True):
""" Returns a user and jwt token """
test_user = factories.UserFactory() if user is None else user
payload = generate_latest_version_payload(test_user)
if is_valid:
if is_valid_signature:
jwt_token = generate_jwt_token(payload)
else:
jwt_token = generate_jwt_token(payload, signing_key='invalid-key')
Expand Down

0 comments on commit f21c62d

Please sign in to comment.