diff --git a/individual/gql_mutations.py b/individual/gql_mutations.py index 6092604..5f93080 100644 --- a/individual/gql_mutations.py +++ b/individual/gql_mutations.py @@ -1,6 +1,7 @@ import graphene from django.core.exceptions import ValidationError from django.db import transaction +from django.db.models import Subquery from core.gql.gql_mutations.base_mutation import BaseHistoryModelDeleteMutationMixin, BaseMutation, \ BaseHistoryModelUpdateMutationMixin, BaseHistoryModelCreateMutationMixin @@ -150,6 +151,15 @@ def _validate_mutation(cls, user, **data): IndividualConfig.gql_individual_delete_perms): raise ValidationError("mutation.authentication_required") + villages_qs = Location.objects.filter(individual__id__in=data['ids'], type='V') + # must first check if villages_qs exists in case none of the individuals has location + if villages_qs.exists(): + allowed_loc_ids = Location.get_queryset(None, user).values('id') + not_in_allowed = villages_qs.exclude(id__in=Subquery(allowed_loc_ids)) + # all individuals' villages must be within permission for the given user + if not allowed_loc_ids.exists() or not_in_allowed.exists(): + raise ValidationError("mutation.authentication_required") + @classmethod def _mutate(cls, user, **data): if "client_mutation_id" in data: diff --git a/individual/tests/graphql_mutation_test.py b/individual/tests/graphql_mutation_test.py index b52701a..7c2d6e9 100644 --- a/individual/tests/graphql_mutation_test.py +++ b/individual/tests/graphql_mutation_test.py @@ -294,3 +294,126 @@ def test_update_individual_row_security(self): content = json.loads(response.content) id = content['data']['updateIndividual']['internalId'] self.assert_mutation_success(id) + + def test_delete_individual_general_permission(self): + individual1 = create_individual(self.admin_user.username) + individual2 = create_individual(self.admin_user.username) + query_str = f''' + mutation {{ + deleteIndividual( + input: {{ + ids: ["{individual1.id}", "{individual2.id}"] + }} + ) {{ + clientMutationId + internalId + }} + }} + ''' + + # Anonymous User has no permission + response = self.query(query_str) + + content = json.loads(response.content) + id = content['data']['deleteIndividual']['internalId'] + self.assert_mutation_error(id, 'mutation.authentication_required') + + # Health Enrollment Officier (role=1) has no permission + response = self.query( + query_str, + headers={"HTTP_AUTHORIZATION": f"Bearer {self.med_enroll_officer_token}"} + ) + content = json.loads(response.content) + id = content['data']['deleteIndividual']['internalId'] + self.assert_mutation_error(id, 'mutation.authentication_required') + + # IMIS admin can do everything + response = self.query( + query_str, + headers={"HTTP_AUTHORIZATION": f"Bearer {self.admin_token}"} + ) + content = json.loads(response.content) + id = content['data']['deleteIndividual']['internalId'] + self.assert_mutation_success(id) + + def test_delete_individual_row_security(self): + individual_a1 = create_individual( + self.admin_user.username, + payload_override={'village': self.village_a}, + ) + individual_a2 = create_individual( + self.admin_user.username, + payload_override={'village': self.village_a}, + ) + individual_b = create_individual( + self.admin_user.username, + payload_override={'village': self.village_b}, + ) + query_str = f''' + mutation {{ + deleteIndividual( + input: {{ + ids: ["{individual_a1.id}"] + }} + ) {{ + clientMutationId + internalId + }} + }} + ''' + + # SP officer B cannot delete individual for district A + response = self.query(query_str) + response = self.query( + query_str, + headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_b_user_token}"} + ) + self.assertResponseNoErrors(response) + + content = json.loads(response.content) + id = content['data']['deleteIndividual']['internalId'] + self.assert_mutation_error(id, 'mutation.authentication_required') + + # SP officer A can delete individual for district A + response = self.query( + query_str, + headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_a_user_token}"} + ) + content = json.loads(response.content) + id = content['data']['deleteIndividual']['internalId'] + self.assert_mutation_success(id) + + # SP officer B can delete individual without any district + individual_no_loc = create_individual(self.admin_user.username) + response = self.query( + query_str.replace( + str(individual_a1.id), + str(individual_no_loc.id) + ), headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_b_user_token}"} + ) + content = json.loads(response.content) + id = content['data']['deleteIndividual']['internalId'] + self.assert_mutation_success(id) + + # SP officer B cannot delete a mix of individuals from district A and district B + response = self.query( + query_str.replace( + f'["{individual_a1.id}"]', + f'["{individual_a1.id}", "{individual_b.id}"]' + ), headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_b_user_token}"} + ) + content = json.loads(response.content) + id = content['data']['deleteIndividual']['internalId'] + self.assert_mutation_error(id, 'mutation.authentication_required') + + # SP officer B can delete individual from district B + individual_no_loc = create_individual(self.admin_user.username) + response = self.query( + query_str.replace( + str(individual_a1.id), + str(individual_b.id) + ), headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_b_user_token}"} + ) + content = json.loads(response.content) + id = content['data']['deleteIndividual']['internalId'] + self.assert_mutation_success(id)