From 71a560de9791ce8823ac37f7f0b5aad1d7cf65ff Mon Sep 17 00:00:00 2001 From: Johan Castiblanco Date: Fri, 10 Nov 2023 16:28:39 -0500 Subject: [PATCH] feat: add pipeline to match user with uid This allow if the response send correct uid and there is a user with username equal, this match that user with using that uid. feat: add tests for the pipe safer associate This add tests for the pipe method `safer_associate_username_by_uid` feat: separate exceptions file Due pr recomendations is better to separate responsabilities of each file. Exception file for tpa module created. refactor: use ddt to check staff ^ superuser tests --- eox_nelp/third_party_auth/exceptions.py | 12 +++ eox_nelp/third_party_auth/pipeline.py | 35 +++++++ eox_nelp/third_party_auth/tests/__init__.py | 0 .../third_party_auth/tests/test_pipeline.py | 93 +++++++++++++++++++ 4 files changed, 140 insertions(+) create mode 100644 eox_nelp/third_party_auth/exceptions.py create mode 100644 eox_nelp/third_party_auth/tests/__init__.py create mode 100644 eox_nelp/third_party_auth/tests/test_pipeline.py diff --git a/eox_nelp/third_party_auth/exceptions.py b/eox_nelp/third_party_auth/exceptions.py new file mode 100644 index 00000000..9bf26322 --- /dev/null +++ b/eox_nelp/third_party_auth/exceptions.py @@ -0,0 +1,12 @@ +"""Custom NELP authentication Exceptions +""" + + +class EoxNelpAuthException(ValueError): + """Auth process exception. + Inspired in https://github.com/eduNEXT/eox-tenant/blob/master/eox_tenant/pipeline.py#L6 + """ + + def __init__(self, backend, *args, **kwargs): + self.backend = backend + super().__init__(*args, **kwargs) diff --git a/eox_nelp/third_party_auth/pipeline.py b/eox_nelp/third_party_auth/pipeline.py index ba8b4923..fa43c2fe 100644 --- a/eox_nelp/third_party_auth/pipeline.py +++ b/eox_nelp/third_party_auth/pipeline.py @@ -8,6 +8,8 @@ from django.contrib.auth import logout from social_core.pipeline.social_auth import social_details as social_core_details +from eox_nelp.third_party_auth.exceptions import EoxNelpAuthException + def social_details(backend, details, response, *args, **kwargs): """This is an extension of `social_core.pipeline.social_auth.social_details` that allows @@ -65,3 +67,36 @@ def close_mismatch_session(request, *args, user=None, **kwargs): # pylint: disa logout(request) return {} + + +def safer_associate_username_by_uid( # pylint: disable=unused-argument + backend, details, response, *args, user=None, **kwargs, +): + """Pipeline to retrieve user if possible matching uid with the username of a user. + The uid is based in the configuration of the saml with the field `attr_user_permanent_id`: + https://github.com/python-social-auth/social-core/blob/master/social_core/backends/saml.py#L49 + + This is using the idp and the uid inspired in: + https://github.com/python-social-auth/social-core/blob/master/social_core/backends/saml.py#L296C23-L297 + Raises: + EoxNelpAuthException: If someone tries to have staff or superuser permission using tpa. + + Returns: + dict: Dict with user if matches, if not return None. + """ + if user: + return None + + idp = backend.get_idp(response["idp_name"]) + uid = idp.get_user_permanent_id(response["attributes"]) + user_match = backend.strategy.storage.user.get_user(username=uid) + + if not user_match: + return None + if user_match.is_staff or user_match.is_superuser: + raise EoxNelpAuthException(backend, "It is not allowed to auto associate staff or admin users") + + return { + "user": user_match, + "is_new": False, + } diff --git a/eox_nelp/third_party_auth/tests/__init__.py b/eox_nelp/third_party_auth/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/eox_nelp/third_party_auth/tests/test_pipeline.py b/eox_nelp/third_party_auth/tests/test_pipeline.py new file mode 100644 index 00000000..14933589 --- /dev/null +++ b/eox_nelp/third_party_auth/tests/test_pipeline.py @@ -0,0 +1,93 @@ +""" Test file for third_party_auth pipeline functions.""" +from ddt import data, ddt +from django.contrib.auth import get_user_model +from django.test import TestCase +from mock import Mock + +from eox_nelp.third_party_auth.exceptions import EoxNelpAuthException +from eox_nelp.third_party_auth.pipeline import safer_associate_username_by_uid + +User = get_user_model() + + +@ddt +class SaferAssociaciateUsernameUidTestCase(TestCase): + """Test class for safer_associate_username_by_uid method""" + + def setUp(self): # pylint: disable=invalid-name + """ + Set base variables and objects across experience test cases. + """ + self.details = {} + self.response = { + "idp_name": "tpa-saml-sso", + "attributes": {}, + } + self.user, _ = User.objects.get_or_create(username="vader") + + def test_user_already_matched(self): + """Test the pipeline method is called with already matched user. + Expected behavior: + - Return None. + """ + backend = Mock() + + pipe_output = safer_associate_username_by_uid(backend, self.details, self.response, user=self.user) + + self.assertEqual(None, pipe_output) + + def test_user_not_associated(self): + """Test the pipeline method try to match with uid but there is not user with that username. + Expected behavior: + - Pipe Return None. + - Strategy storage get_user method is called with desired params. + """ + test_uid = "1888222999" + backend = Mock() + backend.get_idp.return_value.get_user_permanent_id.return_value = test_uid + backend.strategy.storage.user.get_user.return_value = None + + pipe_output = safer_associate_username_by_uid(backend, self.details, self.response) + + self.assertEqual(None, pipe_output) + backend.strategy.storage.user.get_user.assert_called_with(username=test_uid) + + @data( + {"is_staff": True, "username": "1222333444"}, + {"is_superuser": True, "username": "1222333555"}, + ) + def test_staff_user_raise_exc(self, user_kwargs): + """Test the pipeline method try to match with uid, the user with matched username exists + but is staff or superuser. + Expected behavior: + - Raise EoxNelpAuthException exception + - Strategy storage get_user method is called with desired params. + """ + test_uid = user_kwargs["username"] + past_user, _ = User.objects.get_or_create(**user_kwargs) + backend = Mock() + backend.get_idp.return_value.get_user_permanent_id.return_value = test_uid + backend.strategy.storage.user.get_user.return_value = past_user + + self.assertRaises(EoxNelpAuthException, safer_associate_username_by_uid, backend, self.details, self.response) + backend.strategy.storage.user.get_user.assert_called_with(username=test_uid) + + def test_user_associate_username_with_uid(self): + """Test the pipeline method try to match with uid, the user with matched username exists + and is returned. + Expected behavior: + - Strategy storage get_user method is called with desired params. + - The method return the desirect with `user` and `is_new` keys. + """ + test_uid = "1777888999" + past_user, _ = User.objects.get_or_create( + username=test_uid, + ) + backend = Mock() + backend.get_idp.return_value.get_user_permanent_id.return_value = test_uid + backend.strategy.storage.user.get_user.return_value = past_user + + pipe_output = safer_associate_username_by_uid(backend, self.details, self.response) + + self.assertEqual({"user": past_user, "is_new": False}, pipe_output) + backend.strategy.storage.user.get_user.assert_called_with(username=test_uid)