From 98f2d7e5f10a7984f504923e2ef53f6242628212 Mon Sep 17 00:00:00 2001 From: Jing Cheng Date: Thu, 27 Jun 2024 11:48:42 -0400 Subject: [PATCH] Return user sets that are governed by idp in different cases --- corehq/apps/sso/models.py | 22 ++++++++++++++++++++++ corehq/apps/sso/tasks.py | 35 +++++++++-------------------------- 2 files changed, 31 insertions(+), 26 deletions(-) diff --git a/corehq/apps/sso/models.py b/corehq/apps/sso/models.py index ef7b16488d77..0c0fb45dc27e 100644 --- a/corehq/apps/sso/models.py +++ b/corehq/apps/sso/models.py @@ -445,6 +445,28 @@ def get_all_usernames_of_the_idp(self): else: raise NotImplementedError("Not implemented") + def get_webuser_names_goverened_by_idp(self): + usernames_in_account = set(self.owner.get_web_user_usernames()) + + if self.login_enforcement_type == LoginEnforcementType.GLOBAL: + authenticated_domains = AuthenticatedEmailDomain.objects.filter(identity_provider=self) + exempt_usernames = UserExemptFromSingleSignOn.objects.filter(email_domain__in=authenticated_domains + ).values_list('username', flat=True) + authenticated_email_domains = authenticated_domains.values_list('email_domain', flat=True) + + usernames = [] + + for username in usernames_in_account: + if username not in exempt_usernames and (get_email_domain_from_username(username) + in authenticated_email_domains): + usernames.append(username) + return usernames + + elif self.login_enforcement_type == LoginEnforcementType.TEST: + test_usernames = set(SsoTestUser.objects.filter(email_domain__identity_provider__slug=self.slug + ).values_list('username', flat=True)) + return list(test_usernames.intersection(usernames_in_account)) + @receiver(post_save, sender=Subscription) @receiver(post_delete, sender=Subscription) diff --git a/corehq/apps/sso/tasks.py b/corehq/apps/sso/tasks.py index 6dabe2324a4d..eaf5d869ae98 100644 --- a/corehq/apps/sso/tasks.py +++ b/corehq/apps/sso/tasks.py @@ -8,12 +8,9 @@ from corehq.apps.hqwebapp.tasks import send_html_email_async from corehq.apps.sso.exceptions import EntraVerificationFailed, EntraUnsupportedType from corehq.apps.sso.models import ( - AuthenticatedEmailDomain, IdentityProvider, IdentityProviderProtocol, IdentityProviderType, - UserExemptFromSingleSignOn, - LoginEnforcementType, ) from corehq.apps.sso.utils.context_helpers import ( get_api_secret_expiration_email_context, @@ -21,7 +18,7 @@ get_sso_deactivation_skip_email_context, ) from corehq.apps.sso.utils.entra import MSGraphIssue -from corehq.apps.sso.utils.user_helpers import convert_emails_to_lowercase, get_email_domain_from_username +from corehq.apps.sso.utils.user_helpers import convert_emails_to_lowercase from corehq.apps.users.models import WebUser from corehq.apps.users.models import HQApiKey from django.contrib.auth.models import User @@ -129,10 +126,9 @@ def auto_deactivate_removed_sso_users(): for idp in IdentityProvider.objects.filter( enable_user_deactivation=True, idp_type=IdentityProviderType.ENTRA_ID, - login_enforcement_type=LoginEnforcementType.GLOBAL, ).all(): try: - usernames_in_idp = convert_emails_to_lowercase(idp.get_all_usernames_of_the_idp()) + usernames_in_idp = set(convert_emails_to_lowercase(idp.get_all_usernames_of_the_idp())) except EntraVerificationFailed as e: notify_exception(None, f"Failed to get members of the IdP. {str(e)}") send_deactivation_skipped_email(idp=idp, failure_code=MSGraphIssue.VERIFICATION_ERROR, @@ -158,28 +154,15 @@ def auto_deactivate_removed_sso_users(): send_deactivation_skipped_email(idp=idp, failure_code=MSGraphIssue.EMPTY_ERROR) continue - usernames_in_account = idp.owner.get_web_user_usernames() - - # Get criteria for exempting usernames and email domains from the deactivation list - authenticated_domains = AuthenticatedEmailDomain.objects.filter(identity_provider=idp) - exempt_usernames = UserExemptFromSingleSignOn.objects.filter(email_domain__in=authenticated_domains - ).values_list('username', flat=True) - - usernames_to_deactivate = [] - authenticated_email_domains = authenticated_domains.values_list('email_domain', flat=True) - - for username in usernames_in_account: - if username not in usernames_in_idp and username not in exempt_usernames: - email_domain = get_email_domain_from_username(username) - if email_domain in authenticated_email_domains: - usernames_to_deactivate.append(username) + usernames_governed_by_idp = set(idp.get_webuser_names_goverened_by_idp()) # Deactivate user that is not returned by Graph Users API - for username in usernames_to_deactivate: - user = WebUser.get_by_username(username) - if user and user.is_active: - user.is_active = False - user.save() + for username in usernames_governed_by_idp: + if username not in usernames_in_idp: + user = WebUser.get_by_username(username) + if user and user.is_active: + user.is_active = False + user.save() def send_deactivation_skipped_email(idp, failure_code, error=None, error_description=None):