diff --git a/README.md b/README.md index 9f8204a..7eaffe1 100644 --- a/README.md +++ b/README.md @@ -361,3 +361,13 @@ report containing the invalid items. After reviewing the report, the user can pr The report containing invalid items can be downloaded from the upload history on the benefit plan page. * When a user accepts the valid items from an import that faced issues with some invalid items and there are no errors in this workflow, the status of the import is marked as `PARTIAL_SUCCESS`. This triggers the `beneficiary-import-valid-items` workflow in such cases. + + +### Enabling Python Workflows +Module comes with simple workflows for data upload. +They should be used for the development purposes, not in production environment. +To activate these Python workflows, a configuration change is required. +Specifically, the `enable_python_workflows` parameter to `true` within module config. + +Workflows: + * beneficiary upload \ No newline at end of file diff --git a/social_protection/apps.py b/social_protection/apps.py index b2a6d8b..1fbccfe 100644 --- a/social_protection/apps.py +++ b/social_protection/apps.py @@ -24,7 +24,9 @@ "validation_calculation_uuid": "4362f958-5894-435b-9bda-df6cadf88352", "validation_import_valid_items": "validation.import_valid_items", "validation_download_invalid_items": "validation.download_invalid_items", - "validation_import_valid_items_workflow": "beneficiary-import-valid-items" + "validation_import_valid_items_workflow": "beneficiary-import-valid-items", + + "enable_python_workflows": False } @@ -54,11 +56,30 @@ class SocialProtectionConfig(AppConfig): validation_download_invalid_items = None validation_import_valid_items_workflow = None + enable_python_workflows = None + def ready(self): from core.models import ModuleConfiguration cfg = ModuleConfiguration.get_or_default(self.name, DEFAULT_CONFIG) self.__load_config(cfg) + self._set_up_workflows() + + def _set_up_workflows(self): + from workflow.systems.python import PythonWorkflowAdaptor + from social_protection.workflows import process_import_beneficiaries_workflow + + if self.enable_python_workflows: + PythonWorkflowAdaptor.register_workflow( + 'Python Beneficiaries Upload', + 'socialProtection', + process_import_beneficiaries_workflow + ) + + # Replace default setup for invalid workflow to be python one + if self.enable_python_workflows is True and \ + self.validation_import_valid_items_workflow == DEFAULT_CONFIG['validation_import_valid_items_workflow']: + self.validation_import_valid_items_workflow = 'Python Beneficiaries Upload' @classmethod def __load_config(cls, cfg): diff --git a/social_protection/services.py b/social_protection/services.py index 7543ab3..f6233ab 100644 --- a/social_protection/services.py +++ b/social_protection/services.py @@ -1,4 +1,5 @@ import copy +import json import logging import uuid @@ -21,6 +22,7 @@ BenefitPlanDataUploadRecords, GroupBeneficiary ) +from social_protection.utils import load_dataframe from social_protection.validation import ( BeneficiaryValidation, BenefitPlanValidation, GroupBeneficiaryValidation @@ -28,6 +30,7 @@ from tasks_management.services import UpdateCheckerLogicServiceMixin, CheckerLogicServiceMixin, \ crud_business_data_builder from workflow.systems.base import WorkflowHandler +from workflow.util import result as WorkflowExecutionResult from core.models import User logger = logging.getLogger(__name__) @@ -124,18 +127,23 @@ def __init__(self, user): super().__init__() self.user = user - @transaction.atomic @register_service_signal('benefit_plan.import_beneficiaries') def import_beneficiaries(self, import_file: InMemoryUploadedFile, benefit_plan: BenefitPlan, workflow: WorkflowHandler): + upload = self._save_sources(import_file) + self._trigger_workflow(workflow, upload, benefit_plan) + return {'success': True, 'data': {'upload_uuid': upload.uuid}} + + @transaction.atomic + def _save_sources(self, import_file): + # Method separated as workflow execution must be independent of the atomic transaction. upload = self._create_upload_entry(import_file.name) dataframe = self._load_import_file(import_file) self._validate_dataframe(dataframe) self._save_data_source(dataframe, upload) - self._trigger_workflow(workflow, upload, benefit_plan) - return {'success': True, 'data': {'upload_uuid': upload.uuid}} + return upload def validate_import_beneficiaries(self, upload_id: uuid, individual_sources, benefit_plan: BenefitPlan): dataframe = self._load_dataframe(individual_sources) @@ -146,6 +154,7 @@ def validate_import_beneficiaries(self, upload_id: uuid, individual_sources, ben ) return {'success': True, 'data': validated_dataframe, 'summary_invalid_items': invalid_items} + def create_task_with_importing_valid_items(self, upload_id: uuid, benefit_plan: BenefitPlan): self._create_import_valid_items_task(benefit_plan, upload_id, self.user) @@ -226,30 +235,37 @@ def _save_data_source(self, dataframe: pd.DataFrame, upload: IndividualDataSourc dataframe.apply(self._save_row, axis='columns', args=(upload,)) def _save_row(self, row, upload): - ds = IndividualDataSource(upload=upload, json_ext=row.to_dict(), validations={}) + ds = IndividualDataSource(upload=upload, json_ext=json.loads(row.to_json()), validations={}) ds.save(username=self.user.login_name) def _load_dataframe(self, individual_sources) -> pd.DataFrame: - data_from_source = [] - for individual_source in individual_sources: - json_ext = individual_source.json_ext - individual_source.json_ext["id"] = individual_source.id - data_from_source.append(json_ext) - recreated_df = pd.DataFrame(data_from_source) - return recreated_df + return load_dataframe(individual_sources) def _trigger_workflow(self, workflow: WorkflowHandler, upload: IndividualDataSourceUpload, benefit_plan: BenefitPlan): - workflow.run({ - # Core user UUID required - 'user_uuid': str(User.objects.get(username=self.user.login_name).id), - 'benefit_plan_uuid': str(benefit_plan.uuid), - 'upload_uuid': str(upload.uuid), - }) - upload.status = IndividualDataSourceUpload.Status.TRIGGERED - upload.save(username=self.user.login_name) + try: + # Before the run in order to avoid racing conditions + upload.status = IndividualDataSourceUpload.Status.TRIGGERED + upload.save(username=self.user.login_name) + + result = workflow.run({ + # Core user UUID required + 'user_uuid': str(User.objects.get(username=self.user.login_name).id), + 'benefit_plan_uuid': str(benefit_plan.uuid), + 'upload_uuid': str(upload.uuid), + }) + + # Conditions are safety measure for workflows. Usually handles like PythonHandler or LightningHandler + # should follow this pattern but return type is not determined in workflow.run abstract. + if result and isinstance(result, dict) and result.get('success') is False: + raise ValueError(result.get('message', 'Unexpected error during the workflow execution')) + except ValueError as e: + upload.status = IndividualDataSourceUpload.Status.FAIL + upload.error = {'workflow': str(e)} + upload.save(username=self.user.login_name) + return upload def __save_validation_error_in_data_source(self, row, field_validation): error_fields = [] diff --git a/social_protection/utils.py b/social_protection/utils.py new file mode 100644 index 0000000..ca3b4e9 --- /dev/null +++ b/social_protection/utils.py @@ -0,0 +1,15 @@ +from typing import Iterable + +import pandas as pd + +from individual.models import IndividualDataSource + + +def load_dataframe(individual_sources: Iterable[IndividualDataSource]) -> pd.DataFrame: + data_from_source = [] + for individual_source in individual_sources: + json_ext = individual_source.json_ext + individual_source.json_ext["id"] = individual_source.id + data_from_source.append(json_ext) + recreated_df = pd.DataFrame(data_from_source) + return recreated_df diff --git a/social_protection/workflows/__init__.py b/social_protection/workflows/__init__.py new file mode 100644 index 0000000..00815e1 --- /dev/null +++ b/social_protection/workflows/__init__.py @@ -0,0 +1,23 @@ +""" +Overview + +The Python workflows detailed here are designed to facilitate the validation and processing +of beneficiary upload data within a development environment. +These workflows implement a series of steps to ensure that uploaded data meets specific criteria, +both in terms of content and structure, before proceeding with database operations. + +Important Note: These workflows are intended for use in a development (dev) environment. +They are executed in a single-threaded manner, which may not be efficient for processing larger batches of data. +Their primary purpose is to illustrate the concept of beneficiary upload workflows and similar processes. + +Development Use and Efficiency Considerations + + Single-Threaded Execution: Given that these workflows operate in a single-threaded context, + they may exhibit limitations in processing speed and efficiency when handling large datasets. + + Development Environment Application: It is recommended to utilize these workflows within + a dev environment to explore and understand the underlying concepts of beneficiary data upload and processing. + They serve as a foundational guide for developing more robust, production-ready solutions. + +""" +from social_protection.workflows.base_beneficiary_upload import process_import_beneficiaries_workflow diff --git a/social_protection/workflows/base_beneficiary_upload.py b/social_protection/workflows/base_beneficiary_upload.py new file mode 100644 index 0000000..3134e6a --- /dev/null +++ b/social_protection/workflows/base_beneficiary_upload.py @@ -0,0 +1,180 @@ +import json +import logging + +from django.db import connection +from django.db import ProgrammingError + +from core.models import User +from individual.models import IndividualDataSource +from social_protection.models import BenefitPlan +from social_protection.services import BeneficiaryImportService +from social_protection.utils import load_dataframe +from workflow.exceptions import PythonWorkflowHandlerException + +logger = logging.getLogger(__name__) + + +def validate_dataframe_headers(df, schema): + """ + Validates if DataFrame headers: + 1. Are included in the JSON schema properties. + 2. Include 'first_name', 'last_name', and 'dob'. + """ + df_headers = set(df.columns) + schema_properties = set(schema.get('properties', {}).keys()) + required_headers = {'first_name', 'last_name', 'dob', 'id'} + + errors = [] + if not (df_headers-required_headers).issubset(schema_properties): + invalid_headers = df_headers - schema_properties - required_headers + errors.append( + F"Uploaded beneficiaries contains invalid columns: {invalid_headers}" + ) + + for field in required_headers: + if field not in df_headers: + errors.append( + F"Uploaded beneficiaries missing essential header: {field}" + ) + + if errors: + raise PythonWorkflowHandlerException("\n".join(errors)) + + +def process_import_beneficiaries_workflow(user_uuid, benefit_plan_uuid, upload_uuid): + # Call the records validation service directly with the provided arguments + user = User.objects.get(id=user_uuid) + import_service = BeneficiaryImportService(user) + benefit_plan = BenefitPlan.objects.filter(uuid=benefit_plan_uuid, is_deleted=False).first() + schema = benefit_plan.beneficiary_data_schema + df = load_dataframe(IndividualDataSource.objects.filter(upload_id=upload_uuid)) + + # Valid headers are necessary conditions, breaking whole update. If file is invalid then + # upload is aborted because no record can be uploaded. + validate_dataframe_headers(df, schema) + + validation_response = import_service.validate_import_beneficiaries( + upload_id=upload_uuid, + individual_sources=IndividualDataSource.objects.filter(upload_id=upload_uuid), + benefit_plan=benefit_plan + ) + + try: + if validation_response['summary_invalid_items']: + # If some records were not validated, call the task creation service + import_service.create_task_with_importing_valid_items(upload_uuid, benefit_plan) + else: + # All records are fine, execute SQL logic + execute_sql_logic(upload_uuid, user_uuid, benefit_plan_uuid) + except ProgrammingError as e: + # The exception on procedure execution is handled by the procedure itself. + logger.log(logging.WARNING, F'Error during beneficiary upload workflow, details:\n{str(e)}') + return + except Exception as e: + raise PythonWorkflowHandlerException(str(e)) + + +def execute_sql_logic(upload_uuid, user_uuid, benefit_plan_uuid): + with connection.cursor() as cursor: + current_upload_id = upload_uuid + userUUID = user_uuid + benefitPlan = benefit_plan_uuid + # The SQL logic here needs to be carefully translated or executed directly + # The provided SQL is complex and may require breaking down into multiple steps or ORM operations + cursor.execute(""" +DO $$ + DECLARE + current_upload_id UUID := %s::UUID; + userUUID UUID := %s::UUID; + benefitPlan UUID := %s::UUID; + failing_entries UUID[]; + json_schema jsonb; + failing_entries_invalid_json UUID[]; + failing_entries_first_name UUID[]; + failing_entries_last_name UUID[]; + failing_entries_dob UUID[]; + BEGIN + -- Check if all required fields are present in the entries + SELECT ARRAY_AGG("UUID") INTO failing_entries_first_name + FROM individual_individualdatasource + WHERE upload_id=current_upload_id and individual_id is null and "isDeleted"=False AND NOT "Json_ext" ? 'first_name'; + + SELECT ARRAY_AGG("UUID") INTO failing_entries_last_name + FROM individual_individualdatasource + WHERE upload_id=current_upload_id and individual_id is null and "isDeleted"=False AND NOT "Json_ext" ? 'last_name'; + + SELECT ARRAY_AGG("UUID") INTO failing_entries_dob + FROM individual_individualdatasource + WHERE upload_id=current_upload_id and individual_id is null and "isDeleted"=False AND NOT "Json_ext" ? 'dob'; + + + -- Check if any entries have invalid Json_ext according to the schema + SELECT beneficiary_data_schema INTO json_schema FROM social_protection_benefitplan WHERE "UUID" = benefitPlan; + SELECT ARRAY_AGG("UUID") INTO failing_entries_invalid_json + FROM individual_individualdatasource + WHERE upload_id=current_upload_id and individual_id is null and "isDeleted"=False AND NOT validate_json_schema(json_schema, "Json_ext"); + + -- If any entries do not meet the criteria or missing required fields, set the error message in the upload table and do not proceed further + IF failing_entries_invalid_json IS NOT NULL or failing_entries_first_name IS NOT NULL OR failing_entries_last_name IS NOT NULL OR failing_entries_dob IS NOT NULL THEN + UPDATE individual_individualdatasourceupload + SET error = coalesce(error, '{}'::jsonb) || jsonb_build_object('errors', jsonb_build_object( + 'error', 'Invalid entries', + 'timestamp', NOW()::text, + 'upload_id', current_upload_id::text, + 'failing_entries_first_name', failing_entries_first_name, + 'failing_entries_last_name', failing_entries_last_name, + 'failing_entries_dob', failing_entries_dob, + 'failing_entries_invalid_json', failing_entries_invalid_json + )) + WHERE "UUID" = current_upload_id; + + update individual_individualdatasourceupload set status='FAIL' where "UUID" = current_upload_id; + -- If no invalid entries, then proceed with the data manipulation + ELSE + BEGIN + WITH new_entry AS ( + INSERT INTO individual_individual( + "UUID", "isDeleted", version, "UserCreatedUUID", "UserUpdatedUUID", + "Json_ext", first_name, last_name, dob + ) + SELECT gen_random_uuid(), false, 1, userUUID, userUUID, + "Json_ext", "Json_ext"->>'first_name', "Json_ext" ->> 'last_name', to_date("Json_ext" ->> 'dob', 'YYYY-MM-DD') + FROM individual_individualdatasource + WHERE upload_id=current_upload_id and individual_id is null and "isDeleted"=False + RETURNING "UUID", "Json_ext" -- also return the Json_ext + ) + UPDATE individual_individualdatasource + SET individual_id = new_entry."UUID" + FROM new_entry + WHERE upload_id=current_upload_id + and individual_id is null + and "isDeleted"=False + and individual_individualdatasource."Json_ext" = new_entry."Json_ext"; -- match on Json_ext + + + with new_entry_2 as (INSERT INTO social_protection_beneficiary( + "UUID", "isDeleted", "Json_ext", "DateCreated", "DateUpdated", version, "DateValidFrom", "DateValidTo", status, "benefit_plan_id", "individual_id", "UserCreatedUUID", "UserUpdatedUUID" + ) + SELECT gen_random_uuid(), false, iids."Json_ext" - 'first_name' - 'last_name' - 'dob', NOW(), NOW(), 1, NOW(), NULL, 'POTENTIAL', benefitPlan, new_entry."UUID", userUUID, userUUID + FROM individual_individualdatasource iids right join individual_individual new_entry on new_entry."UUID" = iids.individual_id + WHERE iids.upload_id=current_upload_id and iids."isDeleted"=false + returning "UUID") + + + update individual_individualdatasourceupload set status='SUCCESS', error='{}' where "UUID" = current_upload_id; + EXCEPTION + WHEN OTHERS then + + update individual_individualdatasourceupload set status='FAIL' where "UUID" = current_upload_id; + UPDATE individual_individualdatasourceupload + SET error = coalesce(error, '{}'::jsonb) || jsonb_build_object('errors', jsonb_build_object( + 'error', SQLERRM, + 'timestamp', NOW()::text, + 'upload_id', current_upload_id::text + )) + WHERE "UUID" = current_upload_id; + END; + END IF; +END $$; + """, [current_upload_id, userUUID, benefitPlan]) + # Process the cursor results or handle exceptions