Skip to content

Commit

Permalink
feat: create a new pipeline step that validates the national id base … (
Browse files Browse the repository at this point in the history
#233)

* feat: create a new pipeline step that validates the national id base on the uid suffix

* feat: add logger to pipe
  • Loading branch information
andrey-canon authored Nov 18, 2024
1 parent fc47d6d commit e214a56
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 1 deletion.
38 changes: 38 additions & 0 deletions eox_nelp/third_party_auth/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@
social_details: Allows to map response fields to user standard fields.
invalidate_current_user: Sets to None the current user.
"""
import logging

from django.conf import settings
from django.contrib.auth import logout
from django.http import HttpResponseForbidden
from django.shortcuts import redirect
from django.utils.translation import gettext_lazy as _
from social_core.pipeline.social_auth import associate_user
from social_core.pipeline.social_auth import social_details as social_core_details

from eox_nelp.edxapp_wrapper.edxmako import edxmako
from eox_nelp.third_party_auth.utils import match_user_using_uid_query

logger = logging.getLogger(__name__)


def social_details(backend, details, response, *args, **kwargs):
"""This is an extension of `social_core.pipeline.social_auth.social_details` that allows
Expand Down Expand Up @@ -143,3 +149,35 @@ def disallow_staff_superuser_users( # pylint: disable=unused-argument
)
)
return {}


def validate_national_id_and_associate_user(request, backend, uid, *args, user=None, social=None, **kwargs):
"""
Validates the user's national ID against the provided SAML UID before associating
a SAML identity with a Django user. If validation fails, the session is ended, and
the user is redirected to registration.
Args:
request (HttpRequest): The HTTP request object.
backend: The authentication backend used, such as SAML.
uid (str): Unique identifier from SAML (e.g., user ID).
user (User, optional): Django user instance, if found.
social (optional): Existing social authentication data, if found.
Returns:
If the UID validation succeeds, proceeds to associate the user with social auth.
Otherwise, logs out the current session and redirects to the registration page.
"""
national_id = user.extrainfo.national_id if user and hasattr(user, "extrainfo") else ""

if national_id and uid.endswith(national_id):
return associate_user(backend, uid, user, social, *args, **kwargs)

logger.warning(
"User association failed: UID does not end with the user's national ID. UID: %s, National ID: %s",
uid,
national_id,
)
logout(request)

return redirect("/register")
60 changes: 59 additions & 1 deletion eox_nelp/third_party_auth/tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
""" Test file for third_party_auth pipeline functions."""
from custom_reg_form.models import ExtraInfo
from ddt import data, ddt
from django.contrib.auth import get_user_model
from django.core.exceptions import MultipleObjectsReturned
from django.http import HttpResponseForbidden
from django.test import TestCase
from mock import Mock
from mock import Mock, patch
from rest_framework import status

from eox_nelp.third_party_auth import utils
from eox_nelp.third_party_auth.pipeline import (
disallow_staff_superuser_users,
safer_associate_user_by_national_id,
safer_associate_user_by_social_auth_record,
validate_national_id_and_associate_user,
)

User = get_user_model()


class SetUpPipeMixin:
"""Mixin for SetUp pipelines"""

def setUp(self): # pylint: disable=invalid-name
"""
Set base variables and objects across experience test cases.
Expand All @@ -40,6 +43,7 @@ def tearDown(self): # pylint: disable=invalid-name

class SaferAssociateUserUsingUid(SetUpPipeMixin):
"""Mixin for pipes that match user using uid"""

# pylint: disable=no-member
def test_user_already_matched(self):
"""Test the pipeline method is called with already matched user.
Expand Down Expand Up @@ -199,3 +203,57 @@ def test_not_staff_or_superuser_user(self):
)

self.assertDictEqual({}, pipe_output)


class ValidateNationalIdAndAssociateUserTestCase(SetUpPipeMixin, TestCase):
"""Test case for `validate_national_id_and_associate_user` pipeline"""

def test_associate_user_when_national_id_matches_uid(self):
"""Test that the user is associated if the national ID matches the UID suffix.
Expected behavior:
- Calls the original social_core_associate_user function.
"""
self.user.extrainfo = ExtraInfo(arabic_name="فيدر", national_id="1234567890")
test_uid = "test-saml:1234567890"
social_auth = Mock(user=self.user)
self.backend.strategy.storage.user.create_social_auth.return_value = social_auth

pipe_output = validate_national_id_and_associate_user(self.request, self.backend, test_uid, user=self.user)

self.assertEqual(pipe_output, {"user": self.user, "new_association": True, "social": social_auth})

@patch("eox_nelp.third_party_auth.pipeline.logout")
def test_redirect_to_register_when_national_id_does_not_match(self, mock_logout):
"""Test that user is logged out and redirected if the national ID doesn't match UID.
Expected behavior:
- Calls logout on the request.
- Returns redirect with status code 302
- Redirects to the registration page.
"""
self.user.extrainfo = ExtraInfo(arabic_name="فيدر", national_id="1452487963")
test_uid = "test-saml:1234567890"

pipe_output = validate_national_id_and_associate_user(self.request, self.backend, test_uid, user=self.user)

mock_logout.assert_called_once_with(self.request)
self.assertEqual(pipe_output.status_code, 302)
self.assertEqual(pipe_output.url, "/register")

@patch("eox_nelp.third_party_auth.pipeline.logout")
def test_redirect_when_user_has_no_national_id(self, mock_logout):
"""Test that user is redirected if no national ID is found on user.
Expected behavior:
- Calls logout on the request.
- Returns redirect with status code 302
- Redirects to the registration page.
"""
setattr(self.user, "extrainfo", None)
test_uid = "1777888999"

pipe_output = validate_national_id_and_associate_user(self.request, self.backend, test_uid, user=self.user)

mock_logout.assert_called_once_with(self.request)
self.assertEqual(pipe_output.status_code, 302)
self.assertEqual(pipe_output.url, "/register")

0 comments on commit e214a56

Please sign in to comment.