Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create a new pipeline step that validates the national id base … #233

Merged
merged 2 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions eox_nelp/third_party_auth/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@
social_details: Allows to map response fields to user standard fields.
invalidate_current_user: Sets to None the current user.
"""
import logging

from django.conf import settings
from django.contrib.auth import logout
from django.http import HttpResponseForbidden
from django.shortcuts import redirect
from django.utils.translation import gettext_lazy as _
from social_core.pipeline.social_auth import associate_user
from social_core.pipeline.social_auth import social_details as social_core_details

from eox_nelp.edxapp_wrapper.edxmako import edxmako
from eox_nelp.third_party_auth.utils import match_user_using_uid_query

logger = logging.getLogger(__name__)


def social_details(backend, details, response, *args, **kwargs):
"""This is an extension of `social_core.pipeline.social_auth.social_details` that allows
Expand Down Expand Up @@ -143,3 +149,35 @@ def disallow_staff_superuser_users( # pylint: disable=unused-argument
)
)
return {}


def validate_national_id_and_associate_user(request, backend, uid, *args, user=None, social=None, **kwargs):
"""
Validates the user's national ID against the provided SAML UID before associating
a SAML identity with a Django user. If validation fails, the session is ended, and
the user is redirected to registration.

Args:
request (HttpRequest): The HTTP request object.
backend: The authentication backend used, such as SAML.
uid (str): Unique identifier from SAML (e.g., user ID).
user (User, optional): Django user instance, if found.
social (optional): Existing social authentication data, if found.

Returns:
If the UID validation succeeds, proceeds to associate the user with social auth.
Otherwise, logs out the current session and redirects to the registration page.
"""
national_id = user.extrainfo.national_id if user and hasattr(user, "extrainfo") else ""

if national_id and uid.endswith(national_id):
return associate_user(backend, uid, user, social, *args, **kwargs)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a log here to know that this pipeline is working blocking incorrect associate users...
eg like this log
image

logger.warning(
"User association failed: UID does not end with the user's national ID. UID: %s, National ID: %s",
uid,
national_id,
)
logout(request)

return redirect("/register")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a redirect. Do you think this pipe would have @partial decorator? To continue the process in this step pipe???
https://python-social-auth.readthedocs.io/en/latest/pipeline.html#partial-pipeline

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That decorator doesn't fit in this context at all, when I was developing this pipe I included this backend.strategy.clean_partial_pipeline(partial_token) to restart the current partial then I realized that is not necessary because logout(request) performs that operation, basically this pipe aims to restart the whole pipeline when the national id condition is not valid in order to start a new process so doesn't make sense to start a new process from this step

60 changes: 59 additions & 1 deletion eox_nelp/third_party_auth/tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
""" Test file for third_party_auth pipeline functions."""
from custom_reg_form.models import ExtraInfo
from ddt import data, ddt
from django.contrib.auth import get_user_model
from django.core.exceptions import MultipleObjectsReturned
from django.http import HttpResponseForbidden
from django.test import TestCase
from mock import Mock
from mock import Mock, patch
from rest_framework import status

from eox_nelp.third_party_auth import utils
from eox_nelp.third_party_auth.pipeline import (
disallow_staff_superuser_users,
safer_associate_user_by_national_id,
safer_associate_user_by_social_auth_record,
validate_national_id_and_associate_user,
)

User = get_user_model()


class SetUpPipeMixin:
"""Mixin for SetUp pipelines"""

def setUp(self): # pylint: disable=invalid-name
"""
Set base variables and objects across experience test cases.
Expand All @@ -40,6 +43,7 @@ def tearDown(self): # pylint: disable=invalid-name

class SaferAssociateUserUsingUid(SetUpPipeMixin):
"""Mixin for pipes that match user using uid"""

# pylint: disable=no-member
def test_user_already_matched(self):
"""Test the pipeline method is called with already matched user.
Expand Down Expand Up @@ -199,3 +203,57 @@ def test_not_staff_or_superuser_user(self):
)

self.assertDictEqual({}, pipe_output)


class ValidateNationalIdAndAssociateUserTestCase(SetUpPipeMixin, TestCase):
"""Test case for `validate_national_id_and_associate_user` pipeline"""

def test_associate_user_when_national_id_matches_uid(self):
"""Test that the user is associated if the national ID matches the UID suffix.

Expected behavior:
- Calls the original social_core_associate_user function.
"""
self.user.extrainfo = ExtraInfo(arabic_name="فيدر", national_id="1234567890")
test_uid = "test-saml:1234567890"
social_auth = Mock(user=self.user)
self.backend.strategy.storage.user.create_social_auth.return_value = social_auth

pipe_output = validate_national_id_and_associate_user(self.request, self.backend, test_uid, user=self.user)

self.assertEqual(pipe_output, {"user": self.user, "new_association": True, "social": social_auth})

@patch("eox_nelp.third_party_auth.pipeline.logout")
def test_redirect_to_register_when_national_id_does_not_match(self, mock_logout):
"""Test that user is logged out and redirected if the national ID doesn't match UID.

Expected behavior:
- Calls logout on the request.
- Returns redirect with status code 302
- Redirects to the registration page.
"""
self.user.extrainfo = ExtraInfo(arabic_name="فيدر", national_id="1452487963")
test_uid = "test-saml:1234567890"

pipe_output = validate_national_id_and_associate_user(self.request, self.backend, test_uid, user=self.user)

mock_logout.assert_called_once_with(self.request)
self.assertEqual(pipe_output.status_code, 302)
self.assertEqual(pipe_output.url, "/register")

@patch("eox_nelp.third_party_auth.pipeline.logout")
def test_redirect_when_user_has_no_national_id(self, mock_logout):
"""Test that user is redirected if no national ID is found on user.
Expected behavior:
- Calls logout on the request.
- Returns redirect with status code 302
- Redirects to the registration page.
"""
setattr(self.user, "extrainfo", None)
test_uid = "1777888999"

pipe_output = validate_national_id_and_associate_user(self.request, self.backend, test_uid, user=self.user)

mock_logout.assert_called_once_with(self.request)
self.assertEqual(pipe_output.status_code, 302)
self.assertEqual(pipe_output.url, "/register")
Loading