diff --git a/social_protection/services.py b/social_protection/services.py index f9da7eb..b223194 100644 --- a/social_protection/services.py +++ b/social_protection/services.py @@ -180,7 +180,7 @@ def _create_benefit_plan_data_upload_records(self, benefit_plan, workflow, uploa workflow=workflow.name, json_ext={"group_aggregation_column": group_aggregation_column} ) - record.save(username=self.user.username) + record.save(user=self.user) def validate_import_beneficiaries(self, upload_id: uuid, individual_sources, benefit_plan: BenefitPlan): dataframe = self._load_dataframe(individual_sources) @@ -235,7 +235,11 @@ def synchronize_data_for_reporting(self, upload_id: uuid, benefit_plan: BenefitP self._synchronize_beneficiary(benefit_plan, upload_id) def _validate_possible_beneficiaries(self, dataframe: DataFrame, benefit_plan: BenefitPlan, upload_id: uuid, num_workers=4): - schema_dict = benefit_plan.beneficiary_data_schema + + if isinstance(benefit_plan.beneficiary_data_schema, str): + schema_dict = json.loads(benefit_plan.beneficiary_data_schema) + else: + schema_dict = benefit_plan.beneficiary_data_schema properties = schema_dict.get("properties", {}) calculation_uuid = SocialProtectionConfig.validation_calculation_uuid @@ -289,7 +293,10 @@ def process_chunk(chunk, properties, unique_validations, calculation, calculatio # Uniqueness Check if "uniqueness" in field_properties and field in row: - field_validation['validations'][f'{field}_uniqueness'] = not unique_validations[field].loc[row.name] + field_validation['validations'][f'{field}_uniqueness'] = { + 'success': not unique_validations[field].loc[row.name] + } + validated_dataframe.append(field_validation) @@ -426,7 +433,7 @@ def _synchronize_individual(self, upload_id): individual.json_ext.update(synch_status) else: individual.json_ext = synch_status - individual.save(username=self.user.username) + individual.save(user=self.user) def _synchronize_beneficiary(self, benefit_plan, upload_id): unique_uuids = list(( @@ -447,7 +454,7 @@ def _synchronize_beneficiary(self, benefit_plan, upload_id): beneficiary.json_ext.update(synch_status) else: beneficiary.json_ext = synch_status - beneficiary.save(username=self.user.username) + beneficiary.save(user=self.user) class BeneficiaryTaskCreatorService: @@ -491,7 +498,7 @@ def _create_task(self, benefit_plan, upload_id, business_event): data_upload = upload_record.data_upload data_upload.status = IndividualDataSourceUpload.Status.WAITING_FOR_VERIFICATION - data_upload.save(username=self.user.username) + data_upload.save(user=self.user) def __calculate_percentage_of_invalid_items(self, upload_id): number_of_valid_items = len(fetch_summary_of_valid_items(upload_id)) @@ -505,3 +512,8 @@ def __calculate_percentage_of_invalid_items(self, upload_id): percentage_of_invalid_items = round(percentage_of_invalid_items, 2) return percentage_of_invalid_items + + +class GroupBeneficiaryImportService(BeneficiaryImportService): + pass + # TODO: create workflow upload/update groups and use it here \ No newline at end of file diff --git a/social_protection/tests/__init__.py b/social_protection/tests/__init__.py index 1bbdfa7..8f61055 100644 --- a/social_protection/tests/__init__.py +++ b/social_protection/tests/__init__.py @@ -3,3 +3,5 @@ from .group_beneficiary_service_test import GroupBeneficiaryServiceTest from .beneficiary_import_service_test import BeneficiaryImportServiceTest from .beneficiary_gql_test import BeneficiaryGQLTest +# TODO: implement group upload workflow +# from .group_import_gql_test import GroupBeneficiaryImportGQLTest \ No newline at end of file diff --git a/social_protection/tests/group_import_gql_test.py b/social_protection/tests/group_import_gql_test.py new file mode 100644 index 0000000..8e39f6d --- /dev/null +++ b/social_protection/tests/group_import_gql_test.py @@ -0,0 +1,143 @@ +from unittest import mock +import graphene +import random +import uuid +import string +from core.models import User +from core.models.openimis_graphql_test_case import openIMISGraphQLTestCase +from core.test_helpers import create_test_interactive_user +from social_protection import schema as sp_schema +from graphene import Schema +from graphene.test import Client +from graphene_django.utils.testing import GraphQLTestCase +from django.conf import settings +from graphql_jwt.shortcuts import get_token +from social_protection.tests.test_helpers import create_benefit_plan,\ + create_group_with_individual, add_group_to_benefit_plan, create_individual, add_individual_to_group +from social_protection.services import GroupBeneficiaryService +from social_protection.models import IndividualDataSourceUpload +import json +from django.core.files.uploadedfile import SimpleUploadedFile +from tasks_management.models import Task, TaskExecutor, TaskGroup +from tasks_management.services import TaskService, TaskExecutorService, TaskGroupService +from tasks_management.tests.data import TaskDataMixin +from core.test_helpers import LogInHelper + +class GroupBeneficiaryImportGQLTest(openIMISGraphQLTestCase, TaskDataMixin): + schema = Schema(query=sp_schema.Query) + + class BaseTestContext: + def __init__(self, user): + self.user = user + + class AnonymousUserContext: + user = mock.Mock(is_anonymous=True) + + @classmethod + def setUpClass(cls): + super(GroupBeneficiaryImportGQLTest, cls).setUpClass() + cls.user = User.objects.filter(username='admin', i_user__isnull=False).first() + if not cls.user: + cls.user = create_test_interactive_user(username='admin') + + cls.user_token = get_token(cls.user, cls.BaseTestContext(user=cls.user)) + cls.benefit_plan = create_benefit_plan(cls.user.username, payload_override={ + 'code': 'GGQLTest', + 'type': 'GROUP', + 'beneficiary_data_schema': """{"$id": "https://example.com/beneficiares.schema.json", "type": "object", "title": "Record of beneficiares", "$schema": "http://json-schema.org/draft-04/schema#", "properties": {"email": {"type": "string", "description": "email address to contact with beneficiary", "validationCalculation": {"name": "EmailValidationStrategy"}}, "groupId": {"type": "string", "description": "Group categorization"}, "able_bodied": {"type": "boolean", "description": "Flag determining whether someone is able bodied or not"}, "national_id": {"type": "string", "uniqueness": true, "description": "national id"}, "educated_level": {"type": "string", "description": "The level of person when it comes to the school/education/studies"}, "recipient_info": {"type": "integer", "description": "main or not recipient_info"}, "chronic_illness": {"type": "boolean", "description": "Flag determining whether someone has such kind of illness or not"}, "national_id_type": {"type": "string", "description": "A type of national id"}, "number_of_elderly": {"type": "integer", "description": "Number of elderly"}, "number_of_children": {"type": "integer", "description": "Number of children"}, "beneficiary_data_source": {"type": "string", "description": "The source from where such beneficiary comes"}}, "description": "This document records the details beneficiares"}""" + }) + + cls.task_executor = LogInHelper().get_or_create_user_api( + username='TaskExecutor') + cls.init_data() + + obj = TaskGroupService(cls.user).create({ + **cls.task_group_add_payload_any, + "user_ids": [cls.task_executor.id, cls.user.id] + }) + + group_id = obj.get("data")["id"] + cls.task_group = TaskGroup.objects.get(id=group_id) + cls.task_group_service = TaskGroupService(cls.user) + cls.task_service = TaskService(cls.user) + + def test_import_beneficiaries(self): + # Prepare the file to be uploaded + csv_content = ( + "first_name,last_name,dob,email,able_bodied,national_id,educated_level,national_id_type,number_of_elderly,number_of_children,groupId,recipient_info\n" + "NTestPerson1AA,TestPerson1AA,1995-07-13,maixl21@test.com,False,111A11122,basic education,National ID Card,20,4,\"g12\",\"1\"\n" + "NTestPerson1BB,TestPerson1BB,1995-07-13,m2xail2@test.com,False,123A12312S,basic education,National ID Card,20,4,\"g12\",\"0\"\n" + ) + + filename = F"{''.join(random.choices(string.ascii_uppercase + string.digits, k=10))}.csv" + csv_file = SimpleUploadedFile( + filename, + csv_content.encode("utf-8"), + content_type="text/csv" + ) + + # Prepare the JWT token for the request + headers = { + 'Authorization': f'Bearer {self.user_token}' + } + + # Prepare the payload + data = { + "file": csv_file, + "benefit_plan": str(self.benefit_plan.id), + "workflow_name": "Python Beneficiaries Upload", + "workflow_group": "socialProtection", + "group_aggregation_column": "groupId", + } + + # Send the POST request to the import endpoint + response = self.client.post( + "/api/social_protection/import_beneficiaries/", + data, + format='multipart', + headers=headers, + ) + + # Assert the response status code + self.assertEqual(response.status_code, 200) + content = response.data + self.assertEqual(content['success'], True) + + upload_uuid = content['data']['upload_uuid'] + + upload = IndividualDataSourceUpload.objects.get(id=upload_uuid) + self.assertEqual( + upload.status, IndividualDataSourceUpload.Status.WAITING_FOR_VERIFICATION, + F"Invalid upload status, should be waiting for verification, is {upload.status}. Error list: \n{upload.error}" + ) + + pending_task = Task.objects.all().order_by('date_created').last() + self.assertEqual(pending_task.json_ext['source_name'], filename, "Task for approving upload of group not found") + + + mut_id_task_update = uuid.uuid4() + raw_input = F"""clientMutationId: "{mut_id_task_update}"\n id: \"{pending_task.uuid}\"\n status: ACCEPTED\n taskGroupId: \"{self.task_group.uuid}\"\n """ + content=self.send_mutation("updateTask", raw_input, self.user_token, raw=True) + self.assertEqual(content['data']['mutationLogs']['edges'][0]['node']['status'], 2, "Fail during Task Group assingnment") + + # self.task_service.complete_task() + + input_param = {} + mut_id = uuid.uuid4() + raw_input = F"""clientMutationId: "{mut_id}"\n id: "{pending_task.uuid}"\n businessStatus: "{{\\"{self.user.id}\\":\\"APPROVED\\"}}"\n""" + content=self.send_mutation("resolveTask", raw_input, self.user_token, raw=True) + self.assertEqual(content['data']['mutationLogs']['edges'][0]['node']['status'], 2, "Fail during Task resolve") + + pending_task.refresh_from_db() + upload.refresh_from_db() + + + ## At the moment Test Fail on Beneficiary Synchronization in self._synchronize_beneficiary(benefit_plan, upload_id) in + # openimis-be-social_protection_py/social_protection/services.py + self.assertEqual( + upload.status, IndividualDataSourceUpload.Status.SUCCESS, + F"Failure during individuals record upload. Expected success, actual status: {upload.status}. \nErrors: {upload.error}" + ) + + # TODO: Add assertion for the group creation + # TODO: Add assertion for beneficiary creation \ No newline at end of file diff --git a/social_protection/views.py b/social_protection/views.py index cb35486..0b7cc64 100644 --- a/social_protection/views.py +++ b/social_protection/views.py @@ -14,7 +14,7 @@ from individual.models import IndividualDataSource from social_protection.apps import SocialProtectionConfig from social_protection.models import BenefitPlan -from social_protection.services import BeneficiaryImportService +from social_protection.services import BeneficiaryImportService, GroupBeneficiaryImportService from workflow.services import WorkflowService logger = logging.getLogger(__name__) @@ -62,9 +62,15 @@ def import_beneficiaries(request): user = request.user import_file, workflow, benefit_plan, group_aggregation_column = _resolve_import_beneficiaries_args(request) _handle_file_upload(import_file, benefit_plan) - result = BeneficiaryImportService(user).import_beneficiaries( - import_file, benefit_plan, workflow, group_aggregation_column - ) + if benefit_plan.type == BenefitPlan.BenefitPlanType.INDIVIDUAL_TYPE: + result = BeneficiaryImportService(user).import_beneficiaries( + import_file, benefit_plan, workflow, group_aggregation_column + ) + else: + raise NotImplementedError(_("cannot updload groups")) + result = GroupBeneficiaryImportService(user).import_beneficiaries( + import_file, benefit_plan, workflow, group_aggregation_column + ) if not result.get('success'): raise ValueError('{}: {}'.format(result.get("message"), result.get("details"))) @@ -223,8 +229,11 @@ def _resolve_import_beneficiaries_args(request): raise ValueError(f'Workflow name not provided') if not workflow_group: raise ValueError(f'Workflow group not provided') + benefit_plan = BenefitPlan.objects.filter(id=benefit_plan_uuid).first() + if not benefit_plan: + raise ValueError('Benefit Plan not found: {}'.format(benefit_plan_uuid)) if (group_aggregation_column and - BenefitPlan.objects.filter(uuid=benefit_plan_uuid).first().type != BenefitPlan.BenefitPlanType.GROUP_TYPE): + benefit_plan.type != BenefitPlan.BenefitPlanType.GROUP_TYPE): raise ValueError(f'Group aggregation only for group type benefit plans') result = WorkflowService.get_workflows(workflow_name, workflow_group) @@ -239,10 +248,7 @@ def _resolve_import_beneficiaries_args(request): raise ValueError('Multiple workflows found: group={} name={}'.format(workflow_group, workflow_name)) workflow = workflows[0] - benefit_plan = BenefitPlan.objects.filter(uuid=benefit_plan_uuid).first() - if not benefit_plan: - raise ValueError('Benefit Plan not found: {}'.format(benefit_plan_uuid)) return import_file, workflow, benefit_plan, group_aggregation_column diff --git a/social_protection/workflows/utils.py b/social_protection/workflows/utils.py index c5bb960..922586e 100644 --- a/social_protection/workflows/utils.py +++ b/social_protection/workflows/utils.py @@ -2,6 +2,7 @@ Functionalities shared between different python workflows. """ +import json import logging from abc import ABCMeta, abstractmethod from typing import Iterable @@ -53,7 +54,8 @@ def validate_dataframe_headers(self, is_update=False): 4. If action is data upload then 'ID' unique identifier is required as well. """ df_headers = set(self.df.columns) - schema_properties = set(self.schema.get('properties', {}).keys()) + schema = json.loads(self.schema) + schema_properties = set(schema.get('properties', {}).keys()) required_headers = {'first_name', 'last_name', 'dob', 'id'} if is_update: required_headers.add('ID')