Skip to content

Commit

Permalink
Merge pull request #97 from openimis/performance-fix
Browse files Browse the repository at this point in the history
CM-1004: Performance fixes
  • Loading branch information
jdolkowski authored Oct 8, 2024
2 parents e5fb9e5 + ad74ed4 commit b7c5359
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 50 deletions.
121 changes: 91 additions & 30 deletions social_protection/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
import uuid

import concurrent.futures
import math
import pandas as pd
from django.core.files.uploadedfile import InMemoryUploadedFile
from django.db import transaction
Expand Down Expand Up @@ -232,31 +234,66 @@ def synchronize_data_for_reporting(self, upload_id: uuid, benefit_plan: BenefitP
self._synchronize_individual(upload_id)
self._synchronize_beneficiary(benefit_plan, upload_id)

def _validate_possible_beneficiaries(self, dataframe: DataFrame, benefit_plan: BenefitPlan, upload_id: uuid):
def _validate_possible_beneficiaries(self, dataframe: DataFrame, benefit_plan: BenefitPlan, upload_id: uuid, num_workers=4):
schema_dict = benefit_plan.beneficiary_data_schema
properties = schema_dict.get("properties", {})

calculation_uuid = SocialProtectionConfig.validation_calculation_uuid
calculation = get_calculation_object(calculation_uuid)

unique_fields = [field for field, props in properties.items() if "uniqueness" in props]
unique_validations = {}
if unique_fields:
unique_validations = {
field: dataframe[field].duplicated(keep=False)
for field in unique_fields
}

chunk_size = math.ceil(len(dataframe) / num_workers)
data_chunks = [dataframe[i:i + chunk_size] for i in range(0, dataframe.shape[0], chunk_size)]

validated_dataframe = []
with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(
self.process_chunk,
chunk,
properties,
unique_validations,
calculation,
calculation_uuid
) for chunk in data_chunks]

for future in concurrent.futures.as_completed(futures):
validated_dataframe.extend(future.result())

self.save_validation_error_in_data_source_bulk(validated_dataframe)
invalid_items = fetch_summary_of_broken_items(upload_id)
return validated_dataframe, invalid_items

def validate_row(row):
@staticmethod
def process_chunk(chunk, properties, unique_validations, calculation, calculation_uuid):
validated_dataframe = []
for _, row in chunk.iterrows():
field_validation = {'row': row.to_dict(), 'validations': {}}
for field, field_properties in properties.items():
if "validationCalculation" in field_properties:
if field in row:
field_validation['validations'][f'{field}'] = self._handle_validation_calculation(
row, field, field_properties
)
if "uniqueness" in field_properties:
if field in row:
field_validation['validations'][f'{field}_uniqueness'] = self._handle_uniqueness(
row, field, field_properties, benefit_plan, dataframe
)

# Validation Calculation
if "validationCalculation" in field_properties and field in row:
validation_name = field_properties["validationCalculation"]["name"]
field_validation['validations'][field] = calculation.calculate_if_active_for_object(
validation_name,
calculation_uuid,
field_name=field,
field_value=row[field]
)

# Uniqueness Check
if "uniqueness" in field_properties and field in row:
field_validation['validations'][f'{field}_uniqueness'] = not unique_validations[field].loc[row.name]

validated_dataframe.append(field_validation)
self.__save_validation_error_in_data_source(row, field_validation)
return row

dataframe.apply(validate_row, axis='columns')
invalid_items = fetch_summary_of_broken_items(upload_id)
return validated_dataframe, invalid_items
return validated_dataframe

def _handle_uniqueness(self, row, field, field_properties, benefit_plan, dataframe):
unique_class_validation = SocialProtectionConfig.unique_class_validation
Expand Down Expand Up @@ -304,7 +341,20 @@ def _load_import_file(self, import_file) -> pd.DataFrame:
return self.import_loaders[import_file.content_type](import_file)

def _save_data_source(self, dataframe: pd.DataFrame, upload: IndividualDataSourceUpload):
dataframe.apply(self._save_row, axis='columns', args=(upload,))
data_source_objects = []

for _, row in dataframe.iterrows():
ds = IndividualDataSource(
upload=upload,
json_ext=json.loads(row.to_json()),
validations={},
user_created=self.user,
user_updated=self.user,
uuid=uuid.uuid4()
)
data_source_objects.append(ds)

IndividualDataSource.objects.bulk_create(data_source_objects)

def _save_row(self, row, upload):
ds = IndividualDataSource(upload=upload, json_ext=json.loads(row.to_json()), validations={})
Expand Down Expand Up @@ -339,18 +389,29 @@ def _trigger_workflow(self,
upload.save(username=self.user.login_name)
return upload

def __save_validation_error_in_data_source(self, row, field_validation):
error_fields = []
for key, value in field_validation['validations'].items():
if not value['success']:
error_fields.append({
"field_name": value['field_name'],
"note": value['note']
})
individual_data_source = IndividualDataSource.objects.get(id=row['id'])
validation_column = {'validation_errors': error_fields}
individual_data_source.validations = validation_column
individual_data_source.save(username=self.user.username)
def save_validation_error_in_data_source_bulk(self, validated_dataframe):
data_sources_to_update = []

for field_validation in validated_dataframe:
row = field_validation['row']
error_fields = []

for key, value in field_validation['validations'].items():
if not value.get('success', False):
error_fields.append({
"field_name": value.get('field_name'),
"note": value.get('note')
})

data_sources_to_update.append(
IndividualDataSource(
id=row['id'],
validations={'validation_errors': error_fields}
)
)

if data_sources_to_update:
IndividualDataSource.objects.bulk_update(data_sources_to_update, ['validations'])

def _synchronize_individual(self, upload_id):
individuals_to_update = Individual.objects.filter(
Expand Down
10 changes: 8 additions & 2 deletions social_protection/signals/on_confirm_enrollment_of_group.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import uuid

from django.core.exceptions import ValidationError
from individual.models import (
Expand Down Expand Up @@ -53,14 +54,19 @@ def on_confirm_enrollment_of_group(**kwargs):
group_id__in=group_ids,
role=GroupIndividual.Role.HEAD
).distinct()
data_source_objects = []
for group_individual in group_individuals:
source = IndividualDataSource(
upload=upload,
individual=group_individual.individual,
json_ext=group_individual.individual.json_ext,
validations={}
validations={},
user_created=user,
user_updated=user,
uuid=uuid.uuid4(),
)
source.save(username=user.login_name)
data_source_objects.append(source)
IndividualDataSource.objects.bulk_create(data_source_objects)
json_ext = {
'source_name': upload_record.data_upload.source_name,
'workflow': upload_record.workflow,
Expand Down
23 changes: 17 additions & 6 deletions social_protection/signals/on_confirm_enrollment_of_individual.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import uuid

from django.core.exceptions import ValidationError
from individual.models import (
Expand Down Expand Up @@ -45,14 +46,19 @@ def on_confirm_enrollment_of_individual(**kwargs):
workflow="Enrollment"
)
upload_record.save(username=user.username)
data_source_objects = []
for individual in individuals_to_upload:
source = IndividualDataSource(
uuid=uuid.uuid4(),
user_created=user,
user_updated=user,
upload=upload,
individual=individual,
json_ext=individual.json_ext,
validations={}
)
source.save(username=user.login_name)
data_source_objects.append(source)
IndividualDataSource.objects.bulk_create(data_source_objects)
json_ext = {
'source_name': upload_record.data_upload.source_name,
'workflow': upload_record.workflow,
Expand All @@ -70,15 +76,20 @@ def on_confirm_enrollment_of_individual(**kwargs):
'json_ext': json_ext
})
else:
new_beneficiaries = []
for individual in individuals_to_upload:
# Create a new Beneficiary instance
beneficiary = Beneficiary(
individual=individual,
benefit_plan_id=benefit_plan_id,
status=status,
json_ext=individual.json_ext
json_ext=individual.json_ext,
user_created=user,
user_updated=user,
uuid=uuid.uuid4(),
)
try:
beneficiary.save(username=user.username)
except ValidationError as e:
logger.error(f"Validation error occurred: {e}")
new_beneficiaries.append(beneficiary)
try:
Beneficiary.objects.bulk_create(new_beneficiaries)
except ValidationError as e:
logger.error(f"Validation error occurred: {e}")
38 changes: 26 additions & 12 deletions social_protection/signals/on_validation_import_valid_items.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import uuid
from django.core.exceptions import ValidationError
from typing import List

Expand Down Expand Up @@ -68,6 +69,7 @@ def __init__(self, workflow: str, upload_record, upload_id: str, benefit_plan: B

def on_task_complete_action(business_event, **kwargs):
from social_protection.apps import SocialProtectionConfig
from social_protection.services import BeneficiaryImportService

result = kwargs.get('result')
if not result or not result.get('success'):
Expand Down Expand Up @@ -96,37 +98,49 @@ def on_task_complete_action(business_event, **kwargs):
individualdatasource__upload_id=data['task']['json_ext']['data_upload_id']
)
user = User.objects.get(id=data['user']['id'])
new_beneficiaries = []
for individual in individuals_to_enroll:
# Create a new Beneficiary instance
beneficiary = Beneficiary(
individual=individual,
benefit_plan_id=data['task']['json_ext']['benefit_plan_id'],
status=data['task']['json_ext']['beneficiary_status'],
json_ext=individual.json_ext
json_ext=individual.json_ext,
user_created=user,
user_updated=user,
uuid=uuid.uuid4(),
)
try:
beneficiary.save(username=user.username)
except ValidationError as e:
logger.error(f"Validation error occurred: {e}")
new_beneficiaries.append(beneficiary)
try:
Beneficiary.objects.bulk_create(new_beneficiaries)
BeneficiaryImportService(user).synchronize_data_for_reporting(
upload_id=data['task']['json_ext']['data_upload_id'],
benefit_plan=data['task']['json_ext']['benefit_plan_id']
)
except ValidationError as e:
logger.error(f"Validation error occurred: {e}")
return
elif business_event == SocialProtectionConfig.validation_group_enrollment:
head_groups_to_enroll = Individual.objects.filter(
individualdatasource__upload_id=data['task']['json_ext']['data_upload_id']
)
user = User.objects.get(id=data['user']['id'])
new_group_beneficiaries = []
for head_individual in head_groups_to_enroll:
# Create a new Beneficiary instance
group_individual_head = GroupIndividual.objects.filter(individual=head_individual).first()
group_beneficiary = GroupBeneficiary(
group=group_individual_head.group,
benefit_plan_id=data['task']['json_ext']['benefit_plan_id'],
status=data['task']['json_ext']['beneficiary_status'],
json_ext=head_individual.json_ext
json_ext=head_individual.json_ext,
user_created=user,
user_updated=user,
uuid=uuid.uuid4(),
)
try:
group_beneficiary.save(username=user.username)
except ValidationError as e:
logger.error(f"Validation error occurred: {e}")
new_group_beneficiaries.append(group_beneficiary)
try:
GroupBeneficiary.objects.bulk_create(new_group_beneficiaries)
except ValidationError as e:
logger.error(f"Validation error occurred: {e}")
return
else:
raise ValueError(f"Business event {business_event} doesn't have assigned workflow.")
Expand Down

0 comments on commit b7c5359

Please sign in to comment.