Skip to content

Commit

Permalink
fixes WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
delcroip committed Oct 31, 2024
1 parent fa0054a commit 31cf35a
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 12 deletions.
19 changes: 13 additions & 6 deletions social_protection/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def _create_benefit_plan_data_upload_records(self, benefit_plan, workflow, uploa
workflow=workflow.name,
json_ext={"group_aggregation_column": group_aggregation_column}
)
record.save(username=self.user.username)
record.save(user=self.user)

def validate_import_beneficiaries(self, upload_id: uuid, individual_sources, benefit_plan: BenefitPlan):
dataframe = self._load_dataframe(individual_sources)
Expand Down Expand Up @@ -235,7 +235,11 @@ def synchronize_data_for_reporting(self, upload_id: uuid, benefit_plan: BenefitP
self._synchronize_beneficiary(benefit_plan, upload_id)

def _validate_possible_beneficiaries(self, dataframe: DataFrame, benefit_plan: BenefitPlan, upload_id: uuid, num_workers=4):
schema_dict = benefit_plan.beneficiary_data_schema

if isinstance(benefit_plan.beneficiary_data_schema, str):
schema_dict = json.loads(benefit_plan.beneficiary_data_schema)
else:
schema_dict = benefit_plan.beneficiary_data_schema
properties = schema_dict.get("properties", {})

calculation_uuid = SocialProtectionConfig.validation_calculation_uuid
Expand Down Expand Up @@ -289,7 +293,10 @@ def process_chunk(chunk, properties, unique_validations, calculation, calculatio

# Uniqueness Check
if "uniqueness" in field_properties and field in row:
field_validation['validations'][f'{field}_uniqueness'] = not unique_validations[field].loc[row.name]
field_validation['validations'][f'{field}_uniqueness'] = {
'success': not unique_validations[field].loc[row.name]
}


validated_dataframe.append(field_validation)

Expand Down Expand Up @@ -426,7 +433,7 @@ def _synchronize_individual(self, upload_id):
individual.json_ext.update(synch_status)
else:
individual.json_ext = synch_status
individual.save(username=self.user.username)
individual.save(user=self.user)

def _synchronize_beneficiary(self, benefit_plan, upload_id):
unique_uuids = list((
Expand All @@ -447,7 +454,7 @@ def _synchronize_beneficiary(self, benefit_plan, upload_id):
beneficiary.json_ext.update(synch_status)
else:
beneficiary.json_ext = synch_status
beneficiary.save(username=self.user.username)
beneficiary.save(user=self.user)


class BeneficiaryTaskCreatorService:
Expand Down Expand Up @@ -491,7 +498,7 @@ def _create_task(self, benefit_plan, upload_id, business_event):

data_upload = upload_record.data_upload
data_upload.status = IndividualDataSourceUpload.Status.WAITING_FOR_VERIFICATION
data_upload.save(username=self.user.username)
data_upload.save(user=self.user)

def __calculate_percentage_of_invalid_items(self, upload_id):
number_of_valid_items = len(fetch_summary_of_valid_items(upload_id))
Expand Down
1 change: 1 addition & 0 deletions social_protection/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from .group_beneficiary_service_test import GroupBeneficiaryServiceTest
from .beneficiary_import_service_test import BeneficiaryImportServiceTest
from .beneficiary_gql_test import BeneficiaryGQLTest
from .group_import_gql_test import GroupBeneficiaryImportGQLTest
2 changes: 1 addition & 1 deletion social_protection/tests/group_import_gql_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def test_import_beneficiaries(self):
# Prepare the payload
data = {
"file": csv_file,
"benefit_plan": "7b5c1699-9ba8-4d53-9a70-183be61f009c",
"benefit_plan": str(self.benefit_plan.id),
"workflow_name": "Python Beneficiaries Upload",
"workflow_group": "socialProtection",
"group_aggregation_column": "groupId",
Expand Down
8 changes: 4 additions & 4 deletions social_protection/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,11 @@ def _resolve_import_beneficiaries_args(request):
raise ValueError(f'Workflow name not provided')
if not workflow_group:
raise ValueError(f'Workflow group not provided')
benefit_plan = BenefitPlan.objects.filter(id=benefit_plan_uuid).first()
if not benefit_plan:
raise ValueError('Benefit Plan not found: {}'.format(benefit_plan_uuid))
if (group_aggregation_column and
BenefitPlan.objects.filter(uuid=benefit_plan_uuid).first().type != BenefitPlan.BenefitPlanType.GROUP_TYPE):
benefit_plan.type != BenefitPlan.BenefitPlanType.GROUP_TYPE):
raise ValueError(f'Group aggregation only for group type benefit plans')

result = WorkflowService.get_workflows(workflow_name, workflow_group)
Expand All @@ -239,10 +242,7 @@ def _resolve_import_beneficiaries_args(request):
raise ValueError('Multiple workflows found: group={} name={}'.format(workflow_group, workflow_name))

workflow = workflows[0]
benefit_plan = BenefitPlan.objects.filter(uuid=benefit_plan_uuid).first()

if not benefit_plan:
raise ValueError('Benefit Plan not found: {}'.format(benefit_plan_uuid))

return import_file, workflow, benefit_plan, group_aggregation_column

Expand Down
4 changes: 3 additions & 1 deletion social_protection/workflows/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Functionalities shared between different python workflows.
"""
import json
import logging
from abc import ABCMeta, abstractmethod
from typing import Iterable
Expand Down Expand Up @@ -53,7 +54,8 @@ def validate_dataframe_headers(self, is_update=False):
4. If action is data upload then 'ID' unique identifier is required as well.
"""
df_headers = set(self.df.columns)
schema_properties = set(self.schema.get('properties', {}).keys())
schema = json.loads(self.schema)
schema_properties = set(schema.get('properties', {}).keys())
required_headers = {'first_name', 'last_name', 'dob', 'id'}
if is_update:
required_headers.add('ID')
Expand Down

0 comments on commit 31cf35a

Please sign in to comment.