Skip to content

Commit

Permalink
Return user sets that are governed by idp in different cases
Browse files Browse the repository at this point in the history
  • Loading branch information
jingcheng16 committed Jun 27, 2024
1 parent 6ed8b72 commit 98f2d7e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
22 changes: 22 additions & 0 deletions corehq/apps/sso/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,28 @@ def get_all_usernames_of_the_idp(self):
else:
raise NotImplementedError("Not implemented")

def get_webuser_names_goverened_by_idp(self):
usernames_in_account = set(self.owner.get_web_user_usernames())

if self.login_enforcement_type == LoginEnforcementType.GLOBAL:
authenticated_domains = AuthenticatedEmailDomain.objects.filter(identity_provider=self)
exempt_usernames = UserExemptFromSingleSignOn.objects.filter(email_domain__in=authenticated_domains
).values_list('username', flat=True)
authenticated_email_domains = authenticated_domains.values_list('email_domain', flat=True)

usernames = []

for username in usernames_in_account:
if username not in exempt_usernames and (get_email_domain_from_username(username)
in authenticated_email_domains):
usernames.append(username)
return usernames

elif self.login_enforcement_type == LoginEnforcementType.TEST:
test_usernames = set(SsoTestUser.objects.filter(email_domain__identity_provider__slug=self.slug
).values_list('username', flat=True))
return list(test_usernames.intersection(usernames_in_account))


@receiver(post_save, sender=Subscription)
@receiver(post_delete, sender=Subscription)
Expand Down
35 changes: 9 additions & 26 deletions corehq/apps/sso/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,17 @@
from corehq.apps.hqwebapp.tasks import send_html_email_async
from corehq.apps.sso.exceptions import EntraVerificationFailed, EntraUnsupportedType
from corehq.apps.sso.models import (
AuthenticatedEmailDomain,
IdentityProvider,
IdentityProviderProtocol,
IdentityProviderType,
UserExemptFromSingleSignOn,
LoginEnforcementType,
)
from corehq.apps.sso.utils.context_helpers import (
get_api_secret_expiration_email_context,
get_idp_cert_expiration_email_context,
get_sso_deactivation_skip_email_context,
)
from corehq.apps.sso.utils.entra import MSGraphIssue
from corehq.apps.sso.utils.user_helpers import convert_emails_to_lowercase, get_email_domain_from_username
from corehq.apps.sso.utils.user_helpers import convert_emails_to_lowercase
from corehq.apps.users.models import WebUser
from corehq.apps.users.models import HQApiKey
from django.contrib.auth.models import User
Expand Down Expand Up @@ -129,10 +126,9 @@ def auto_deactivate_removed_sso_users():
for idp in IdentityProvider.objects.filter(
enable_user_deactivation=True,
idp_type=IdentityProviderType.ENTRA_ID,
login_enforcement_type=LoginEnforcementType.GLOBAL,
).all():
try:
usernames_in_idp = convert_emails_to_lowercase(idp.get_all_usernames_of_the_idp())
usernames_in_idp = set(convert_emails_to_lowercase(idp.get_all_usernames_of_the_idp()))
except EntraVerificationFailed as e:
notify_exception(None, f"Failed to get members of the IdP. {str(e)}")
send_deactivation_skipped_email(idp=idp, failure_code=MSGraphIssue.VERIFICATION_ERROR,
Expand All @@ -158,28 +154,15 @@ def auto_deactivate_removed_sso_users():
send_deactivation_skipped_email(idp=idp, failure_code=MSGraphIssue.EMPTY_ERROR)
continue

usernames_in_account = idp.owner.get_web_user_usernames()

# Get criteria for exempting usernames and email domains from the deactivation list
authenticated_domains = AuthenticatedEmailDomain.objects.filter(identity_provider=idp)
exempt_usernames = UserExemptFromSingleSignOn.objects.filter(email_domain__in=authenticated_domains
).values_list('username', flat=True)

usernames_to_deactivate = []
authenticated_email_domains = authenticated_domains.values_list('email_domain', flat=True)

for username in usernames_in_account:
if username not in usernames_in_idp and username not in exempt_usernames:
email_domain = get_email_domain_from_username(username)
if email_domain in authenticated_email_domains:
usernames_to_deactivate.append(username)
usernames_governed_by_idp = set(idp.get_webuser_names_goverened_by_idp())

# Deactivate user that is not returned by Graph Users API
for username in usernames_to_deactivate:
user = WebUser.get_by_username(username)
if user and user.is_active:
user.is_active = False
user.save()
for username in usernames_governed_by_idp:
if username not in usernames_in_idp:
user = WebUser.get_by_username(username)
if user and user.is_active:
user.is_active = False
user.save()


def send_deactivation_skipped_email(idp, failure_code, error=None, error_description=None):
Expand Down

0 comments on commit 98f2d7e

Please sign in to comment.