Skip to content

Commit

Permalink
Merge pull request #106 from eduNEXT/jlc/nid-pipeline
Browse files Browse the repository at this point in the history
Jlc/nid pipeline
  • Loading branch information
johanseto authored Nov 14, 2023
2 parents e0eaca9 + 71a560d commit f9732ad
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 0 deletions.
12 changes: 12 additions & 0 deletions eox_nelp/third_party_auth/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Custom NELP authentication Exceptions
"""


class EoxNelpAuthException(ValueError):
"""Auth process exception.
Inspired in https://github.com/eduNEXT/eox-tenant/blob/master/eox_tenant/pipeline.py#L6
"""

def __init__(self, backend, *args, **kwargs):
self.backend = backend
super().__init__(*args, **kwargs)
35 changes: 35 additions & 0 deletions eox_nelp/third_party_auth/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from django.contrib.auth import logout
from social_core.pipeline.social_auth import social_details as social_core_details

from eox_nelp.third_party_auth.exceptions import EoxNelpAuthException


def social_details(backend, details, response, *args, **kwargs):
"""This is an extension of `social_core.pipeline.social_auth.social_details` that allows
Expand Down Expand Up @@ -65,3 +67,36 @@ def close_mismatch_session(request, *args, user=None, **kwargs): # pylint: disa
logout(request)

return {}


def safer_associate_username_by_uid( # pylint: disable=unused-argument
backend, details, response, *args, user=None, **kwargs,
):
"""Pipeline to retrieve user if possible matching uid with the username of a user.
The uid is based in the configuration of the saml with the field `attr_user_permanent_id`:
https://github.com/python-social-auth/social-core/blob/master/social_core/backends/saml.py#L49
This is using the idp and the uid inspired in:
https://github.com/python-social-auth/social-core/blob/master/social_core/backends/saml.py#L296C23-L297
Raises:
EoxNelpAuthException: If someone tries to have staff or superuser permission using tpa.
Returns:
dict: Dict with user if matches, if not return None.
"""
if user:
return None

idp = backend.get_idp(response["idp_name"])
uid = idp.get_user_permanent_id(response["attributes"])
user_match = backend.strategy.storage.user.get_user(username=uid)

if not user_match:
return None
if user_match.is_staff or user_match.is_superuser:
raise EoxNelpAuthException(backend, "It is not allowed to auto associate staff or admin users")

return {
"user": user_match,
"is_new": False,
}
Empty file.
93 changes: 93 additions & 0 deletions eox_nelp/third_party_auth/tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
""" Test file for third_party_auth pipeline functions."""
from ddt import data, ddt
from django.contrib.auth import get_user_model
from django.test import TestCase
from mock import Mock

from eox_nelp.third_party_auth.exceptions import EoxNelpAuthException
from eox_nelp.third_party_auth.pipeline import safer_associate_username_by_uid

User = get_user_model()


@ddt
class SaferAssociaciateUsernameUidTestCase(TestCase):
"""Test class for safer_associate_username_by_uid method"""

def setUp(self): # pylint: disable=invalid-name
"""
Set base variables and objects across experience test cases.
"""
self.details = {}
self.response = {
"idp_name": "tpa-saml-sso",
"attributes": {},
}
self.user, _ = User.objects.get_or_create(username="vader")

def test_user_already_matched(self):
"""Test the pipeline method is called with already matched user.
Expected behavior:
- Return None.
"""
backend = Mock()

pipe_output = safer_associate_username_by_uid(backend, self.details, self.response, user=self.user)

self.assertEqual(None, pipe_output)

def test_user_not_associated(self):
"""Test the pipeline method try to match with uid but there is not user with that username.
Expected behavior:
- Pipe Return None.
- Strategy storage get_user method is called with desired params.
"""
test_uid = "1888222999"
backend = Mock()
backend.get_idp.return_value.get_user_permanent_id.return_value = test_uid
backend.strategy.storage.user.get_user.return_value = None

pipe_output = safer_associate_username_by_uid(backend, self.details, self.response)

self.assertEqual(None, pipe_output)
backend.strategy.storage.user.get_user.assert_called_with(username=test_uid)

@data(
{"is_staff": True, "username": "1222333444"},
{"is_superuser": True, "username": "1222333555"},
)
def test_staff_user_raise_exc(self, user_kwargs):
"""Test the pipeline method try to match with uid, the user with matched username exists
but is staff or superuser.
Expected behavior:
- Raise EoxNelpAuthException exception
- Strategy storage get_user method is called with desired params.
"""
test_uid = user_kwargs["username"]
past_user, _ = User.objects.get_or_create(**user_kwargs)
backend = Mock()
backend.get_idp.return_value.get_user_permanent_id.return_value = test_uid
backend.strategy.storage.user.get_user.return_value = past_user

self.assertRaises(EoxNelpAuthException, safer_associate_username_by_uid, backend, self.details, self.response)
backend.strategy.storage.user.get_user.assert_called_with(username=test_uid)

def test_user_associate_username_with_uid(self):
"""Test the pipeline method try to match with uid, the user with matched username exists
and is returned.
Expected behavior:
- Strategy storage get_user method is called with desired params.
- The method return the desirect with `user` and `is_new` keys.
"""
test_uid = "1777888999"
past_user, _ = User.objects.get_or_create(
username=test_uid,
)
backend = Mock()
backend.get_idp.return_value.get_user_permanent_id.return_value = test_uid
backend.strategy.storage.user.get_user.return_value = past_user

pipe_output = safer_associate_username_by_uid(backend, self.details, self.response)

self.assertEqual({"user": past_user, "is_new": False}, pipe_output)
backend.strategy.storage.user.get_user.assert_called_with(username=test_uid)

0 comments on commit f9732ad

Please sign in to comment.