Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added python workflow for data upload #56

Merged
merged 2 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,13 @@ report containing the invalid items. After reviewing the report, the user can pr
The report containing invalid items can be downloaded from the upload history on the benefit plan page.
* When a user accepts the valid items from an import that faced issues with some invalid items and there are no errors in this workflow,
the status of the import is marked as `PARTIAL_SUCCESS`. This triggers the `beneficiary-import-valid-items` workflow in such cases.


### Enabling Python Workflows
Module comes with simple workflows for data upload.
They should be used for the development purposes, not in production environment.
To activate these Python workflows, a configuration change is required.
Specifically, the `enable_python_workflows` parameter to `true` within module config.

Workflows:
* beneficiary upload
23 changes: 22 additions & 1 deletion social_protection/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
"validation_calculation_uuid": "4362f958-5894-435b-9bda-df6cadf88352",
"validation_import_valid_items": "validation.import_valid_items",
"validation_download_invalid_items": "validation.download_invalid_items",
"validation_import_valid_items_workflow": "beneficiary-import-valid-items"
"validation_import_valid_items_workflow": "beneficiary-import-valid-items",

"enable_python_workflows": False
}


Expand Down Expand Up @@ -54,11 +56,30 @@ class SocialProtectionConfig(AppConfig):
validation_download_invalid_items = None
validation_import_valid_items_workflow = None

enable_python_workflows = None

def ready(self):
from core.models import ModuleConfiguration

cfg = ModuleConfiguration.get_or_default(self.name, DEFAULT_CONFIG)
self.__load_config(cfg)
self._set_up_workflows()

def _set_up_workflows(self):
from workflow.systems.python import PythonWorkflowAdaptor
from social_protection.workflows import process_import_beneficiaries_workflow

if self.enable_python_workflows:
PythonWorkflowAdaptor.register_workflow(
'Python Beneficiaries Upload',
'socialProtection',
process_import_beneficiaries_workflow
)

# Replace default setup for invalid workflow to be python one
if self.enable_python_workflows is True and \
self.validation_import_valid_items_workflow == DEFAULT_CONFIG['validation_import_valid_items_workflow']:
self.validation_import_valid_items_workflow = 'Python Beneficiaries Upload'

@classmethod
def __load_config(cls, cfg):
Expand Down
54 changes: 35 additions & 19 deletions social_protection/services.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import json
import logging
import uuid

Expand All @@ -21,13 +22,15 @@
BenefitPlanDataUploadRecords,
GroupBeneficiary
)
from social_protection.utils import load_dataframe
from social_protection.validation import (
BeneficiaryValidation,
BenefitPlanValidation, GroupBeneficiaryValidation
)
from tasks_management.services import UpdateCheckerLogicServiceMixin, CheckerLogicServiceMixin, \
crud_business_data_builder
from workflow.systems.base import WorkflowHandler
from workflow.util import result as WorkflowExecutionResult
from core.models import User

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -124,18 +127,23 @@ def __init__(self, user):
super().__init__()
self.user = user

@transaction.atomic
@register_service_signal('benefit_plan.import_beneficiaries')
def import_beneficiaries(self,
import_file: InMemoryUploadedFile,
benefit_plan: BenefitPlan,
workflow: WorkflowHandler):
upload = self._save_sources(import_file)
self._trigger_workflow(workflow, upload, benefit_plan)
return {'success': True, 'data': {'upload_uuid': upload.uuid}}

@transaction.atomic
def _save_sources(self, import_file):
# Method separated as workflow execution must be independent of the atomic transaction.
upload = self._create_upload_entry(import_file.name)
dataframe = self._load_import_file(import_file)
self._validate_dataframe(dataframe)
self._save_data_source(dataframe, upload)
self._trigger_workflow(workflow, upload, benefit_plan)
return {'success': True, 'data': {'upload_uuid': upload.uuid}}
return upload

def validate_import_beneficiaries(self, upload_id: uuid, individual_sources, benefit_plan: BenefitPlan):
dataframe = self._load_dataframe(individual_sources)
Expand All @@ -146,6 +154,7 @@ def validate_import_beneficiaries(self, upload_id: uuid, individual_sources, ben
)
return {'success': True, 'data': validated_dataframe, 'summary_invalid_items': invalid_items}


def create_task_with_importing_valid_items(self, upload_id: uuid, benefit_plan: BenefitPlan):
self._create_import_valid_items_task(benefit_plan, upload_id, self.user)

Expand Down Expand Up @@ -226,30 +235,37 @@ def _save_data_source(self, dataframe: pd.DataFrame, upload: IndividualDataSourc
dataframe.apply(self._save_row, axis='columns', args=(upload,))

def _save_row(self, row, upload):
ds = IndividualDataSource(upload=upload, json_ext=row.to_dict(), validations={})
ds = IndividualDataSource(upload=upload, json_ext=json.loads(row.to_json()), validations={})
ds.save(username=self.user.login_name)

def _load_dataframe(self, individual_sources) -> pd.DataFrame:
data_from_source = []
for individual_source in individual_sources:
json_ext = individual_source.json_ext
individual_source.json_ext["id"] = individual_source.id
data_from_source.append(json_ext)
recreated_df = pd.DataFrame(data_from_source)
return recreated_df
return load_dataframe(individual_sources)

def _trigger_workflow(self,
workflow: WorkflowHandler,
upload: IndividualDataSourceUpload,
benefit_plan: BenefitPlan):
workflow.run({
# Core user UUID required
'user_uuid': str(User.objects.get(username=self.user.login_name).id),
'benefit_plan_uuid': str(benefit_plan.uuid),
'upload_uuid': str(upload.uuid),
})
upload.status = IndividualDataSourceUpload.Status.TRIGGERED
upload.save(username=self.user.login_name)
try:
# Before the run in order to avoid racing conditions
upload.status = IndividualDataSourceUpload.Status.TRIGGERED
upload.save(username=self.user.login_name)

result = workflow.run({
# Core user UUID required
'user_uuid': str(User.objects.get(username=self.user.login_name).id),
'benefit_plan_uuid': str(benefit_plan.uuid),
'upload_uuid': str(upload.uuid),
})

# Conditions are safety measure for workflows. Usually handles like PythonHandler or LightningHandler
# should follow this pattern but return type is not determined in workflow.run abstract.
if result and isinstance(result, dict) and result.get('success') is False:
raise ValueError(result.get('message', 'Unexpected error during the workflow execution'))
except ValueError as e:
upload.status = IndividualDataSourceUpload.Status.FAIL
upload.error = {'workflow': str(e)}
upload.save(username=self.user.login_name)
return upload

def __save_validation_error_in_data_source(self, row, field_validation):
error_fields = []
Expand Down
15 changes: 15 additions & 0 deletions social_protection/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Iterable

import pandas as pd

from individual.models import IndividualDataSource


def load_dataframe(individual_sources: Iterable[IndividualDataSource]) -> pd.DataFrame:
data_from_source = []
for individual_source in individual_sources:
json_ext = individual_source.json_ext
individual_source.json_ext["id"] = individual_source.id
data_from_source.append(json_ext)
recreated_df = pd.DataFrame(data_from_source)
return recreated_df
23 changes: 23 additions & 0 deletions social_protection/workflows/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""
Overview

The Python workflows detailed here are designed to facilitate the validation and processing
of beneficiary upload data within a development environment.
These workflows implement a series of steps to ensure that uploaded data meets specific criteria,
both in terms of content and structure, before proceeding with database operations.

Important Note: These workflows are intended for use in a development (dev) environment.
They are executed in a single-threaded manner, which may not be efficient for processing larger batches of data.
Their primary purpose is to illustrate the concept of beneficiary upload workflows and similar processes.

Development Use and Efficiency Considerations

Single-Threaded Execution: Given that these workflows operate in a single-threaded context,
they may exhibit limitations in processing speed and efficiency when handling large datasets.

Development Environment Application: It is recommended to utilize these workflows within
a dev environment to explore and understand the underlying concepts of beneficiary data upload and processing.
They serve as a foundational guide for developing more robust, production-ready solutions.

"""
from social_protection.workflows.base_beneficiary_upload import process_import_beneficiaries_workflow
180 changes: 180 additions & 0 deletions social_protection/workflows/base_beneficiary_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import json
import logging

from django.db import connection
from django.db import ProgrammingError

from core.models import User
from individual.models import IndividualDataSource
from social_protection.models import BenefitPlan
from social_protection.services import BeneficiaryImportService
from social_protection.utils import load_dataframe
from workflow.exceptions import PythonWorkflowHandlerException

logger = logging.getLogger(__name__)


def validate_dataframe_headers(df, schema):
"""
Validates if DataFrame headers:
1. Are included in the JSON schema properties.
2. Include 'first_name', 'last_name', and 'dob'.
"""
df_headers = set(df.columns)
schema_properties = set(schema.get('properties', {}).keys())
required_headers = {'first_name', 'last_name', 'dob', 'id'}

errors = []
if not (df_headers-required_headers).issubset(schema_properties):
invalid_headers = df_headers - schema_properties - required_headers
errors.append(
F"Uploaded beneficiaries contains invalid columns: {invalid_headers}"
)

for field in required_headers:
if field not in df_headers:
errors.append(
F"Uploaded beneficiaries missing essential header: {field}"
)

if errors:
raise PythonWorkflowHandlerException("\n".join(errors))


def process_import_beneficiaries_workflow(user_uuid, benefit_plan_uuid, upload_uuid):
# Call the records validation service directly with the provided arguments
user = User.objects.get(id=user_uuid)
import_service = BeneficiaryImportService(user)
benefit_plan = BenefitPlan.objects.filter(uuid=benefit_plan_uuid, is_deleted=False).first()
schema = benefit_plan.beneficiary_data_schema
df = load_dataframe(IndividualDataSource.objects.filter(upload_id=upload_uuid))

# Valid headers are necessary conditions, breaking whole update. If file is invalid then
# upload is aborted because no record can be uploaded.
validate_dataframe_headers(df, schema)

validation_response = import_service.validate_import_beneficiaries(
upload_id=upload_uuid,
individual_sources=IndividualDataSource.objects.filter(upload_id=upload_uuid),
benefit_plan=benefit_plan
)

try:
if validation_response['summary_invalid_items']:
# If some records were not validated, call the task creation service
import_service.create_task_with_importing_valid_items(upload_uuid, benefit_plan)
else:
# All records are fine, execute SQL logic
execute_sql_logic(upload_uuid, user_uuid, benefit_plan_uuid)
except ProgrammingError as e:
# The exception on procedure execution is handled by the procedure itself.
logger.log(logging.WARNING, F'Error during beneficiary upload workflow, details:\n{str(e)}')
return
except Exception as e:
raise PythonWorkflowHandlerException(str(e))


def execute_sql_logic(upload_uuid, user_uuid, benefit_plan_uuid):
with connection.cursor() as cursor:
current_upload_id = upload_uuid
userUUID = user_uuid
benefitPlan = benefit_plan_uuid
# The SQL logic here needs to be carefully translated or executed directly
# The provided SQL is complex and may require breaking down into multiple steps or ORM operations
cursor.execute("""
DO $$
DECLARE
current_upload_id UUID := %s::UUID;
userUUID UUID := %s::UUID;
benefitPlan UUID := %s::UUID;
failing_entries UUID[];
json_schema jsonb;
failing_entries_invalid_json UUID[];
failing_entries_first_name UUID[];
failing_entries_last_name UUID[];
failing_entries_dob UUID[];
BEGIN
-- Check if all required fields are present in the entries
SELECT ARRAY_AGG("UUID") INTO failing_entries_first_name
FROM individual_individualdatasource
WHERE upload_id=current_upload_id and individual_id is null and "isDeleted"=False AND NOT "Json_ext" ? 'first_name';

SELECT ARRAY_AGG("UUID") INTO failing_entries_last_name
FROM individual_individualdatasource
WHERE upload_id=current_upload_id and individual_id is null and "isDeleted"=False AND NOT "Json_ext" ? 'last_name';

SELECT ARRAY_AGG("UUID") INTO failing_entries_dob
FROM individual_individualdatasource
WHERE upload_id=current_upload_id and individual_id is null and "isDeleted"=False AND NOT "Json_ext" ? 'dob';


-- Check if any entries have invalid Json_ext according to the schema
SELECT beneficiary_data_schema INTO json_schema FROM social_protection_benefitplan WHERE "UUID" = benefitPlan;
SELECT ARRAY_AGG("UUID") INTO failing_entries_invalid_json
FROM individual_individualdatasource
WHERE upload_id=current_upload_id and individual_id is null and "isDeleted"=False AND NOT validate_json_schema(json_schema, "Json_ext");

-- If any entries do not meet the criteria or missing required fields, set the error message in the upload table and do not proceed further
IF failing_entries_invalid_json IS NOT NULL or failing_entries_first_name IS NOT NULL OR failing_entries_last_name IS NOT NULL OR failing_entries_dob IS NOT NULL THEN
UPDATE individual_individualdatasourceupload
SET error = coalesce(error, '{}'::jsonb) || jsonb_build_object('errors', jsonb_build_object(
'error', 'Invalid entries',
'timestamp', NOW()::text,
'upload_id', current_upload_id::text,
'failing_entries_first_name', failing_entries_first_name,
'failing_entries_last_name', failing_entries_last_name,
'failing_entries_dob', failing_entries_dob,
'failing_entries_invalid_json', failing_entries_invalid_json
))
WHERE "UUID" = current_upload_id;

update individual_individualdatasourceupload set status='FAIL' where "UUID" = current_upload_id;
-- If no invalid entries, then proceed with the data manipulation
ELSE
BEGIN
WITH new_entry AS (
INSERT INTO individual_individual(
"UUID", "isDeleted", version, "UserCreatedUUID", "UserUpdatedUUID",
"Json_ext", first_name, last_name, dob
)
SELECT gen_random_uuid(), false, 1, userUUID, userUUID,
"Json_ext", "Json_ext"->>'first_name', "Json_ext" ->> 'last_name', to_date("Json_ext" ->> 'dob', 'YYYY-MM-DD')
FROM individual_individualdatasource
WHERE upload_id=current_upload_id and individual_id is null and "isDeleted"=False
RETURNING "UUID", "Json_ext" -- also return the Json_ext
)
UPDATE individual_individualdatasource
SET individual_id = new_entry."UUID"
FROM new_entry
WHERE upload_id=current_upload_id
and individual_id is null
and "isDeleted"=False
and individual_individualdatasource."Json_ext" = new_entry."Json_ext"; -- match on Json_ext


with new_entry_2 as (INSERT INTO social_protection_beneficiary(
"UUID", "isDeleted", "Json_ext", "DateCreated", "DateUpdated", version, "DateValidFrom", "DateValidTo", status, "benefit_plan_id", "individual_id", "UserCreatedUUID", "UserUpdatedUUID"
)
SELECT gen_random_uuid(), false, iids."Json_ext" - 'first_name' - 'last_name' - 'dob', NOW(), NOW(), 1, NOW(), NULL, 'POTENTIAL', benefitPlan, new_entry."UUID", userUUID, userUUID
FROM individual_individualdatasource iids right join individual_individual new_entry on new_entry."UUID" = iids.individual_id
WHERE iids.upload_id=current_upload_id and iids."isDeleted"=false
returning "UUID")


update individual_individualdatasourceupload set status='SUCCESS', error='{}' where "UUID" = current_upload_id;
EXCEPTION
WHEN OTHERS then

update individual_individualdatasourceupload set status='FAIL' where "UUID" = current_upload_id;
UPDATE individual_individualdatasourceupload
SET error = coalesce(error, '{}'::jsonb) || jsonb_build_object('errors', jsonb_build_object(
'error', SQLERRM,
'timestamp', NOW()::text,
'upload_id', current_upload_id::text
))
WHERE "UUID" = current_upload_id;
END;
END IF;
END $$;
""", [current_upload_id, userUUID, benefitPlan])
# Process the cursor results or handle exceptions
Loading