Skip to content

Commit

Permalink
Add row-level security to individual delete query
Browse files Browse the repository at this point in the history
  • Loading branch information
weilu committed Sep 15, 2024
1 parent 8cd6b76 commit 27a5d33
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 0 deletions.
10 changes: 10 additions & 0 deletions individual/gql_mutations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import graphene
from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.models import Subquery

from core.gql.gql_mutations.base_mutation import BaseHistoryModelDeleteMutationMixin, BaseMutation, \
BaseHistoryModelUpdateMutationMixin, BaseHistoryModelCreateMutationMixin
Expand Down Expand Up @@ -150,6 +151,15 @@ def _validate_mutation(cls, user, **data):
IndividualConfig.gql_individual_delete_perms):
raise ValidationError("mutation.authentication_required")

villages_qs = Location.objects.filter(individual__id__in=data['ids'], type='V')
# must first check if villages_qs exists in case none of the individuals has location
if villages_qs.exists():
allowed_loc_ids = Location.get_queryset(None, user).values('id')
not_in_allowed = villages_qs.exclude(id__in=Subquery(allowed_loc_ids))
# all individuals' villages must be within permission for the given user
if not allowed_loc_ids.exists() or not_in_allowed.exists():
raise ValidationError("mutation.authentication_required")

@classmethod
def _mutate(cls, user, **data):
if "client_mutation_id" in data:
Expand Down
123 changes: 123 additions & 0 deletions individual/tests/graphql_mutation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,126 @@ def test_update_individual_row_security(self):
content = json.loads(response.content)
id = content['data']['updateIndividual']['internalId']
self.assert_mutation_success(id)

def test_delete_individual_general_permission(self):
individual1 = create_individual(self.admin_user.username)
individual2 = create_individual(self.admin_user.username)
query_str = f'''
mutation {{
deleteIndividual(
input: {{
ids: ["{individual1.id}", "{individual2.id}"]
}}
) {{
clientMutationId
internalId
}}
}}
'''

# Anonymous User has no permission
response = self.query(query_str)

content = json.loads(response.content)
id = content['data']['deleteIndividual']['internalId']
self.assert_mutation_error(id, 'mutation.authentication_required')

# Health Enrollment Officier (role=1) has no permission
response = self.query(
query_str,
headers={"HTTP_AUTHORIZATION": f"Bearer {self.med_enroll_officer_token}"}
)
content = json.loads(response.content)
id = content['data']['deleteIndividual']['internalId']
self.assert_mutation_error(id, 'mutation.authentication_required')

# IMIS admin can do everything
response = self.query(
query_str,
headers={"HTTP_AUTHORIZATION": f"Bearer {self.admin_token}"}
)
content = json.loads(response.content)
id = content['data']['deleteIndividual']['internalId']
self.assert_mutation_success(id)

def test_delete_individual_row_security(self):
individual_a1 = create_individual(
self.admin_user.username,
payload_override={'village': self.village_a},
)
individual_a2 = create_individual(
self.admin_user.username,
payload_override={'village': self.village_a},
)
individual_b = create_individual(
self.admin_user.username,
payload_override={'village': self.village_b},
)
query_str = f'''
mutation {{
deleteIndividual(
input: {{
ids: ["{individual_a1.id}"]
}}
) {{
clientMutationId
internalId
}}
}}
'''

# SP officer B cannot delete individual for district A
response = self.query(query_str)
response = self.query(
query_str,
headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_b_user_token}"}
)
self.assertResponseNoErrors(response)

content = json.loads(response.content)
id = content['data']['deleteIndividual']['internalId']
self.assert_mutation_error(id, 'mutation.authentication_required')

# SP officer A can delete individual for district A
response = self.query(
query_str,
headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_a_user_token}"}
)
content = json.loads(response.content)
id = content['data']['deleteIndividual']['internalId']
self.assert_mutation_success(id)

# SP officer B can delete individual without any district
individual_no_loc = create_individual(self.admin_user.username)
response = self.query(
query_str.replace(
str(individual_a1.id),
str(individual_no_loc.id)
), headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_b_user_token}"}
)
content = json.loads(response.content)
id = content['data']['deleteIndividual']['internalId']
self.assert_mutation_success(id)

# SP officer B cannot delete a mix of individuals from district A and district B
response = self.query(
query_str.replace(
f'["{individual_a1.id}"]',
f'["{individual_a1.id}", "{individual_b.id}"]'
), headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_b_user_token}"}
)
content = json.loads(response.content)
id = content['data']['deleteIndividual']['internalId']
self.assert_mutation_error(id, 'mutation.authentication_required')

# SP officer B can delete individual from district B
individual_no_loc = create_individual(self.admin_user.username)
response = self.query(
query_str.replace(
str(individual_a1.id),
str(individual_b.id)
), headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_b_user_token}"}
)
content = json.loads(response.content)
id = content['data']['deleteIndividual']['internalId']
self.assert_mutation_success(id)

0 comments on commit 27a5d33

Please sign in to comment.