Skip to content

Commit

Permalink
Add row-level security to individual undo delete query
Browse files Browse the repository at this point in the history
  • Loading branch information
weilu committed Sep 15, 2024
1 parent 27a5d33 commit 9b814cb
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 0 deletions.
9 changes: 9 additions & 0 deletions individual/gql_mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ def _validate_mutation(cls, user, **data):
IndividualConfig.gql_individual_undo_delete_perms):
raise ValidationError("mutation.authentication_required")

villages_qs = Location.objects.filter(individual__id__in=data['ids'], type='V')
# must first check if villages_qs exists in case none of the individuals has location
if villages_qs.exists():
allowed_loc_ids = Location.get_queryset(None, user).values('id')
not_in_allowed = villages_qs.exclude(id__in=Subquery(allowed_loc_ids))
# all individuals' villages must be within permission for the given user
if not allowed_loc_ids.exists() or not_in_allowed.exists():
raise ValidationError("mutation.authentication_required")

@classmethod
def _mutate(cls, user, **data):
if "client_mutation_id" in data:
Expand Down
109 changes: 109 additions & 0 deletions individual/tests/graphql_mutation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ def test_update_individual_row_security(self):
id = content['data']['updateIndividual']['internalId']
self.assert_mutation_success(id)


def test_delete_individual_general_permission(self):
individual1 = create_individual(self.admin_user.username)
individual2 = create_individual(self.admin_user.username)
Expand Down Expand Up @@ -336,6 +337,46 @@ def test_delete_individual_general_permission(self):
id = content['data']['deleteIndividual']['internalId']
self.assert_mutation_success(id)

# same for undo delete
query_str = f'''
mutation {{
undoDeleteIndividual(
input: {{
ids: ["{individual1.id}", "{individual2.id}"]
}}
) {{
clientMutationId
internalId
}}
}}
'''

# Anonymous User has no permission
response = self.query(query_str)

content = json.loads(response.content)
id = content['data']['undoDeleteIndividual']['internalId']
self.assert_mutation_error(id, 'mutation.authentication_required')

# Health Enrollment Officier (role=1) has no permission
response = self.query(
query_str,
headers={"HTTP_AUTHORIZATION": f"Bearer {self.med_enroll_officer_token}"}
)
content = json.loads(response.content)
id = content['data']['undoDeleteIndividual']['internalId']
self.assert_mutation_error(id, 'mutation.authentication_required')

# IMIS admin can do everything
response = self.query(
query_str,
headers={"HTTP_AUTHORIZATION": f"Bearer {self.admin_token}"}
)
content = json.loads(response.content)
id = content['data']['undoDeleteIndividual']['internalId']
self.assert_mutation_success(id)


def test_delete_individual_row_security(self):
individual_a1 = create_individual(
self.admin_user.username,
Expand Down Expand Up @@ -417,3 +458,71 @@ def test_delete_individual_row_security(self):
content = json.loads(response.content)
id = content['data']['deleteIndividual']['internalId']
self.assert_mutation_success(id)

# same for undo delete
query_str = f'''
mutation {{
undoDeleteIndividual(
input: {{
ids: ["{individual_a1.id}"]
}}
) {{
clientMutationId
internalId
}}
}}
'''

# SP officer B cannot undelete individual for district A
response = self.query(query_str)
response = self.query(
query_str,
headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_b_user_token}"}
)
self.assertResponseNoErrors(response)

content = json.loads(response.content)
id = content['data']['undoDeleteIndividual']['internalId']
self.assert_mutation_error(id, 'mutation.authentication_required')

# SP officer A can undelete individual for district A
response = self.query(
query_str,
headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_a_user_token}"}
)
content = json.loads(response.content)
id = content['data']['undoDeleteIndividual']['internalId']
self.assert_mutation_success(id)

# SP officer B can undelete individual without any district
response = self.query(
query_str.replace(
str(individual_a1.id),
str(individual_no_loc.id)
), headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_b_user_token}"}
)
content = json.loads(response.content)
id = content['data']['undoDeleteIndividual']['internalId']
self.assert_mutation_success(id)

# SP officer B cannot undelete a mix of individuals from district A and district B
response = self.query(
query_str.replace(
f'["{individual_a1.id}"]',
f'["{individual_a1.id}", "{individual_b.id}"]'
), headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_b_user_token}"}
)
content = json.loads(response.content)
id = content['data']['undoDeleteIndividual']['internalId']
self.assert_mutation_error(id, 'mutation.authentication_required')

# SP officer B can undelete individual from district B
response = self.query(
query_str.replace(
str(individual_a1.id),
str(individual_b.id)
), headers={"HTTP_AUTHORIZATION": f"Bearer {self.dist_b_user_token}"}
)
content = json.loads(response.content)
id = content['data']['undoDeleteIndividual']['internalId']
self.assert_mutation_success(id)

0 comments on commit 9b814cb

Please sign in to comment.