Skip to content

Commit

Permalink
Merge pull request #99 from openimis/test/groupImport
Browse files Browse the repository at this point in the history
Added TestCase for Group Upload
  • Loading branch information
delcroip authored Oct 31, 2024
2 parents 1dfba72 + 7bbf441 commit c192b22
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 15 deletions.
24 changes: 18 additions & 6 deletions social_protection/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def _create_benefit_plan_data_upload_records(self, benefit_plan, workflow, uploa
workflow=workflow.name,
json_ext={"group_aggregation_column": group_aggregation_column}
)
record.save(username=self.user.username)
record.save(user=self.user)

def validate_import_beneficiaries(self, upload_id: uuid, individual_sources, benefit_plan: BenefitPlan):
dataframe = self._load_dataframe(individual_sources)
Expand Down Expand Up @@ -235,7 +235,11 @@ def synchronize_data_for_reporting(self, upload_id: uuid, benefit_plan: BenefitP
self._synchronize_beneficiary(benefit_plan, upload_id)

def _validate_possible_beneficiaries(self, dataframe: DataFrame, benefit_plan: BenefitPlan, upload_id: uuid, num_workers=4):
schema_dict = benefit_plan.beneficiary_data_schema

if isinstance(benefit_plan.beneficiary_data_schema, str):
schema_dict = json.loads(benefit_plan.beneficiary_data_schema)
else:
schema_dict = benefit_plan.beneficiary_data_schema
properties = schema_dict.get("properties", {})

calculation_uuid = SocialProtectionConfig.validation_calculation_uuid
Expand Down Expand Up @@ -289,7 +293,10 @@ def process_chunk(chunk, properties, unique_validations, calculation, calculatio

# Uniqueness Check
if "uniqueness" in field_properties and field in row:
field_validation['validations'][f'{field}_uniqueness'] = not unique_validations[field].loc[row.name]
field_validation['validations'][f'{field}_uniqueness'] = {
'success': not unique_validations[field].loc[row.name]
}


validated_dataframe.append(field_validation)

Expand Down Expand Up @@ -426,7 +433,7 @@ def _synchronize_individual(self, upload_id):
individual.json_ext.update(synch_status)
else:
individual.json_ext = synch_status
individual.save(username=self.user.username)
individual.save(user=self.user)

def _synchronize_beneficiary(self, benefit_plan, upload_id):
unique_uuids = list((
Expand All @@ -447,7 +454,7 @@ def _synchronize_beneficiary(self, benefit_plan, upload_id):
beneficiary.json_ext.update(synch_status)
else:
beneficiary.json_ext = synch_status
beneficiary.save(username=self.user.username)
beneficiary.save(user=self.user)


class BeneficiaryTaskCreatorService:
Expand Down Expand Up @@ -491,7 +498,7 @@ def _create_task(self, benefit_plan, upload_id, business_event):

data_upload = upload_record.data_upload
data_upload.status = IndividualDataSourceUpload.Status.WAITING_FOR_VERIFICATION
data_upload.save(username=self.user.username)
data_upload.save(user=self.user)

def __calculate_percentage_of_invalid_items(self, upload_id):
number_of_valid_items = len(fetch_summary_of_valid_items(upload_id))
Expand All @@ -505,3 +512,8 @@ def __calculate_percentage_of_invalid_items(self, upload_id):

percentage_of_invalid_items = round(percentage_of_invalid_items, 2)
return percentage_of_invalid_items


class GroupBeneficiaryImportService(BeneficiaryImportService):
pass
# TODO: create workflow upload/update groups and use it here
2 changes: 2 additions & 0 deletions social_protection/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
from .group_beneficiary_service_test import GroupBeneficiaryServiceTest
from .beneficiary_import_service_test import BeneficiaryImportServiceTest
from .beneficiary_gql_test import BeneficiaryGQLTest
# TODO: implement group upload workflow
# from .group_import_gql_test import GroupBeneficiaryImportGQLTest
143 changes: 143 additions & 0 deletions social_protection/tests/group_import_gql_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from unittest import mock
import graphene
import random
import uuid
import string
from core.models import User
from core.models.openimis_graphql_test_case import openIMISGraphQLTestCase
from core.test_helpers import create_test_interactive_user
from social_protection import schema as sp_schema
from graphene import Schema
from graphene.test import Client
from graphene_django.utils.testing import GraphQLTestCase
from django.conf import settings
from graphql_jwt.shortcuts import get_token
from social_protection.tests.test_helpers import create_benefit_plan,\
create_group_with_individual, add_group_to_benefit_plan, create_individual, add_individual_to_group
from social_protection.services import GroupBeneficiaryService
from social_protection.models import IndividualDataSourceUpload
import json
from django.core.files.uploadedfile import SimpleUploadedFile
from tasks_management.models import Task, TaskExecutor, TaskGroup
from tasks_management.services import TaskService, TaskExecutorService, TaskGroupService
from tasks_management.tests.data import TaskDataMixin
from core.test_helpers import LogInHelper

class GroupBeneficiaryImportGQLTest(openIMISGraphQLTestCase, TaskDataMixin):
schema = Schema(query=sp_schema.Query)

class BaseTestContext:
def __init__(self, user):
self.user = user

class AnonymousUserContext:
user = mock.Mock(is_anonymous=True)

@classmethod
def setUpClass(cls):
super(GroupBeneficiaryImportGQLTest, cls).setUpClass()
cls.user = User.objects.filter(username='admin', i_user__isnull=False).first()
if not cls.user:
cls.user = create_test_interactive_user(username='admin')

cls.user_token = get_token(cls.user, cls.BaseTestContext(user=cls.user))
cls.benefit_plan = create_benefit_plan(cls.user.username, payload_override={
'code': 'GGQLTest',
'type': 'GROUP',
'beneficiary_data_schema': """{"$id": "https://example.com/beneficiares.schema.json", "type": "object", "title": "Record of beneficiares", "$schema": "http://json-schema.org/draft-04/schema#", "properties": {"email": {"type": "string", "description": "email address to contact with beneficiary", "validationCalculation": {"name": "EmailValidationStrategy"}}, "groupId": {"type": "string", "description": "Group categorization"}, "able_bodied": {"type": "boolean", "description": "Flag determining whether someone is able bodied or not"}, "national_id": {"type": "string", "uniqueness": true, "description": "national id"}, "educated_level": {"type": "string", "description": "The level of person when it comes to the school/education/studies"}, "recipient_info": {"type": "integer", "description": "main or not recipient_info"}, "chronic_illness": {"type": "boolean", "description": "Flag determining whether someone has such kind of illness or not"}, "national_id_type": {"type": "string", "description": "A type of national id"}, "number_of_elderly": {"type": "integer", "description": "Number of elderly"}, "number_of_children": {"type": "integer", "description": "Number of children"}, "beneficiary_data_source": {"type": "string", "description": "The source from where such beneficiary comes"}}, "description": "This document records the details beneficiares"}"""
})

cls.task_executor = LogInHelper().get_or_create_user_api(
username='TaskExecutor')
cls.init_data()

obj = TaskGroupService(cls.user).create({
**cls.task_group_add_payload_any,
"user_ids": [cls.task_executor.id, cls.user.id]
})

group_id = obj.get("data")["id"]
cls.task_group = TaskGroup.objects.get(id=group_id)
cls.task_group_service = TaskGroupService(cls.user)
cls.task_service = TaskService(cls.user)

def test_import_beneficiaries(self):
# Prepare the file to be uploaded
csv_content = (
"first_name,last_name,dob,email,able_bodied,national_id,educated_level,national_id_type,number_of_elderly,number_of_children,groupId,recipient_info\n"
"NTestPerson1AA,TestPerson1AA,1995-07-13,[email protected],False,111A11122,basic education,National ID Card,20,4,\"g12\",\"1\"\n"
"NTestPerson1BB,TestPerson1BB,1995-07-13,[email protected],False,123A12312S,basic education,National ID Card,20,4,\"g12\",\"0\"\n"
)

filename = F"{''.join(random.choices(string.ascii_uppercase + string.digits, k=10))}.csv"
csv_file = SimpleUploadedFile(
filename,
csv_content.encode("utf-8"),
content_type="text/csv"
)

# Prepare the JWT token for the request
headers = {
'Authorization': f'Bearer {self.user_token}'
}

# Prepare the payload
data = {
"file": csv_file,
"benefit_plan": str(self.benefit_plan.id),
"workflow_name": "Python Beneficiaries Upload",
"workflow_group": "socialProtection",
"group_aggregation_column": "groupId",
}

# Send the POST request to the import endpoint
response = self.client.post(
"/api/social_protection/import_beneficiaries/",
data,
format='multipart',
headers=headers,
)

# Assert the response status code
self.assertEqual(response.status_code, 200)
content = response.data
self.assertEqual(content['success'], True)

upload_uuid = content['data']['upload_uuid']

upload = IndividualDataSourceUpload.objects.get(id=upload_uuid)
self.assertEqual(
upload.status, IndividualDataSourceUpload.Status.WAITING_FOR_VERIFICATION,
F"Invalid upload status, should be waiting for verification, is {upload.status}. Error list: \n{upload.error}"
)

pending_task = Task.objects.all().order_by('date_created').last()
self.assertEqual(pending_task.json_ext['source_name'], filename, "Task for approving upload of group not found")


mut_id_task_update = uuid.uuid4()
raw_input = F"""clientMutationId: "{mut_id_task_update}"\n id: \"{pending_task.uuid}\"\n status: ACCEPTED\n taskGroupId: \"{self.task_group.uuid}\"\n """
content=self.send_mutation("updateTask", raw_input, self.user_token, raw=True)
self.assertEqual(content['data']['mutationLogs']['edges'][0]['node']['status'], 2, "Fail during Task Group assingnment")

# self.task_service.complete_task()

input_param = {}
mut_id = uuid.uuid4()
raw_input = F"""clientMutationId: "{mut_id}"\n id: "{pending_task.uuid}"\n businessStatus: "{{\\"{self.user.id}\\":\\"APPROVED\\"}}"\n"""
content=self.send_mutation("resolveTask", raw_input, self.user_token, raw=True)
self.assertEqual(content['data']['mutationLogs']['edges'][0]['node']['status'], 2, "Fail during Task resolve")

pending_task.refresh_from_db()
upload.refresh_from_db()


## At the moment Test Fail on Beneficiary Synchronization in self._synchronize_beneficiary(benefit_plan, upload_id) in
# openimis-be-social_protection_py/social_protection/services.py
self.assertEqual(
upload.status, IndividualDataSourceUpload.Status.SUCCESS,
F"Failure during individuals record upload. Expected success, actual status: {upload.status}. \nErrors: {upload.error}"
)

# TODO: Add assertion for the group creation
# TODO: Add assertion for beneficiary creation
22 changes: 14 additions & 8 deletions social_protection/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from individual.models import IndividualDataSource
from social_protection.apps import SocialProtectionConfig
from social_protection.models import BenefitPlan
from social_protection.services import BeneficiaryImportService
from social_protection.services import BeneficiaryImportService, GroupBeneficiaryImportService
from workflow.services import WorkflowService

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -62,9 +62,15 @@ def import_beneficiaries(request):
user = request.user
import_file, workflow, benefit_plan, group_aggregation_column = _resolve_import_beneficiaries_args(request)
_handle_file_upload(import_file, benefit_plan)
result = BeneficiaryImportService(user).import_beneficiaries(
import_file, benefit_plan, workflow, group_aggregation_column
)
if benefit_plan.type == BenefitPlan.BenefitPlanType.INDIVIDUAL_TYPE:
result = BeneficiaryImportService(user).import_beneficiaries(
import_file, benefit_plan, workflow, group_aggregation_column
)
else:
raise NotImplementedError(_("cannot updload groups"))
result = GroupBeneficiaryImportService(user).import_beneficiaries(
import_file, benefit_plan, workflow, group_aggregation_column
)
if not result.get('success'):
raise ValueError('{}: {}'.format(result.get("message"), result.get("details")))

Expand Down Expand Up @@ -223,8 +229,11 @@ def _resolve_import_beneficiaries_args(request):
raise ValueError(f'Workflow name not provided')
if not workflow_group:
raise ValueError(f'Workflow group not provided')
benefit_plan = BenefitPlan.objects.filter(id=benefit_plan_uuid).first()
if not benefit_plan:
raise ValueError('Benefit Plan not found: {}'.format(benefit_plan_uuid))
if (group_aggregation_column and
BenefitPlan.objects.filter(uuid=benefit_plan_uuid).first().type != BenefitPlan.BenefitPlanType.GROUP_TYPE):
benefit_plan.type != BenefitPlan.BenefitPlanType.GROUP_TYPE):
raise ValueError(f'Group aggregation only for group type benefit plans')

result = WorkflowService.get_workflows(workflow_name, workflow_group)
Expand All @@ -239,10 +248,7 @@ def _resolve_import_beneficiaries_args(request):
raise ValueError('Multiple workflows found: group={} name={}'.format(workflow_group, workflow_name))

workflow = workflows[0]
benefit_plan = BenefitPlan.objects.filter(uuid=benefit_plan_uuid).first()

if not benefit_plan:
raise ValueError('Benefit Plan not found: {}'.format(benefit_plan_uuid))

return import_file, workflow, benefit_plan, group_aggregation_column

Expand Down
4 changes: 3 additions & 1 deletion social_protection/workflows/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Functionalities shared between different python workflows.
"""
import json
import logging
from abc import ABCMeta, abstractmethod
from typing import Iterable
Expand Down Expand Up @@ -53,7 +54,8 @@ def validate_dataframe_headers(self, is_update=False):
4. If action is data upload then 'ID' unique identifier is required as well.
"""
df_headers = set(self.df.columns)
schema_properties = set(self.schema.get('properties', {}).keys())
schema = json.loads(self.schema)
schema_properties = set(schema.get('properties', {}).keys())
required_headers = {'first_name', 'last_name', 'dob', 'id'}
if is_update:
required_headers.add('ID')
Expand Down

0 comments on commit c192b22

Please sign in to comment.