Skip to content

Commit

Permalink
feat: API end point for CSV frictionless-validator tech-by-design#996
Browse files Browse the repository at this point in the history
  • Loading branch information
megin1989 committed Jan 6, 2025
1 parent 7e1203e commit d0ba592
Show file tree
Hide file tree
Showing 4 changed files with 331 additions and 0 deletions.
120 changes: 120 additions & 0 deletions support/service/frictionless-validator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@

# CSV Validation Service via FastAPI

## Overview

This project provides a robust solution for validating CSV files using the [Frictionless Framework](https://frictionlessdata.io/) and exposing the validation service through a [FastAPI](https://fastapi.tiangolo.com/) backend.

## Features

- **CSV Schema Validation**: Ensures CSV files conform to predefined schemas.
- **Flat File and JSON Specifications**: Detailed specifications for flat file validations and JSON schemas are available [here](https://github.com/megin1989/polyglot-prime/blob/main/support/specifications/flat-file/README.md).
- **Mapping CSV Fields to FHIR IG Elements**: A detailed mapping of each field in the provided CSV files to their corresponding FHIR IG elements is documented in the [`documentation.auto.md`](https://github.com/tech-by-design/polyglot-prime/blob/main/support/specifications/flat-file/documentation.auto.md) file. This file provides a comprehensive field-by-field mapping and serves as a guide to understanding how the CSV files translate into the FHIR resources.
- **File Upload Examples**: Examples of file uploads are detailed in the [nyher-fhir-ig-example](https://github.com/tech-by-design/polyglot-prime/blob/main/support/specifications/flat-file/nyher-fhir-ig-example/README.md).
- **FastAPI Integration**: Exposes the validation functionality via a RESTful API.
- **Detailed Error Reporting**: Highlights specific issues in the uploaded CSV files.
- **Lightweight and Scalable**: Built with Python for performance and ease of use.

## Technologies Used

- **Frictionless Framework**: For defining schemas and validating CSV files.
- **FastAPI**: For creating a fast and modern web API.
- **Python**: The core programming language for this project.


## Installation


Before you can use this tool, make sure you have the following installed on your system:

- **Python 3.x**:
Ensure that Python 3 is installed on your system. You can check if Python 3 is already installed by running the following command:

```bash
python3 --version
```
If Python 3 is not installed, follow the instructions below to install it:
- Ubuntu/Debian-based systems:
```bash
sudo apt update
sudo apt install python3
```
- macOS (using Homebrew):
```bash
brew install python
- Windows: Download and install the latest version of Python from the official website: https://www.python.org/downloads/
- **pip (Python Package Installer)**: pip is the package manager for Python and is needed to install libraries like Frictionless.
Check if pip is installed by running:
```bash
python3 -m pip --version
```
If pip is not installed, follow these steps:
- On Ubuntu/Debian-based systems:
```bash
sudo apt install python3-pip
```
- On macOS (using Homebrew):
```bash
brew install pip
```
- On Windows: If pip isn't already installed with Python, you can get it from the [official Python pip installation guide](https://pip.pypa.io/en/stable/installation/).
***Troubleshooting***:
If you encounter errors like No module named ensurepip, it's possible that your Python installation is missing the ensurepip module, which is typically used to install pip. In this case, install pip manually using the package manager for your operating system, as described above. Alternatively, you can use the following command to install pip if it's missing:
```bash
python3 -m ensurepip --upgrade
```
### Steps to set up the FastAPI service
1. Create a virtual environment:
```python3 -m venv venv```
2. Activate the virtual environment:
```source venv/bin/activate.fish```
3. Install the requirements from the requirements.txt file to the virtual environment:
```pip install -r requirements.txt```
4. Prepare the application environment using the configuration file by running:
```source .env```
5. Run the FastAPI application:
```uvicorn main:app --reload```
## About the File Naming Convention
The CSV file names in this project follow a strict naming convention to ensure consistency and compatibility with the validation process. Each file name is structured as follows:
**`<DATA_TYPE>_<GROUP_IDENTIFIER>.csv`**
### Components of the File Name
1. **`<DATA_TYPE>`**:
- This is the predefined and mandatory part of the file name. It indicates the type of data contained in the file and must remain unchanged.
- Examples of valid values:
- `QE_ADMIN_DATA_`
- `SCREENING_PROFILE_DATA_`
- `SCREENING_OBSERVATION_DATA_`
- `DEMOGRAPHIC_DATA_`
2. **`<GROUP_IDENTIFIER>`**:
- This part of the file name is flexible and includes the following components:
- **QE Name or Organization**: Represents the entity providing the data (e.g., `partner1-test`).
- **Date**: The date the data was generated or collected, formatted as `YYYYMMDD` (e.g., `20241128`).
- **Test Case or Scenario Identifier**: A specific identifier to distinguish different test cases or scenarios (e.g., `testcase1`).
### Example File Names
- `QE_ADMIN_DATA_partner1-test-20241128-testcase1.csv`
- `SCREENING_PROFILE_DATA_partner1-test-20241128-testcase1.csv`
- `SCREENING_OBSERVATION_DATA_partner1-test-20241128-testcase1.csv`
- `DEMOGRAPHIC_DATA_partner1-test-20241128-testcase1.csv`
17 changes: 17 additions & 0 deletions support/service/frictionless-validator/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from fastapi import FastAPI
from fastapi import FastAPI
from validate_service_nyher_fhir_ig_equivalent import validation_router



def start_application():
app = FastAPI()
app.include_router(validation_router)
return app

app = start_application()


@app.get("/")
def Start_api():
return {"API":"Api Initializing"}
5 changes: 5 additions & 0 deletions support/service/frictionless-validator/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
fastapi==0.115.6
uvicorn==0.33.0
pydantic==2.10.4
python-multipart==0.0.20
frictionless==5.18.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import json
import os
from frictionless import Package, transform, steps, extract
from datetime import datetime, date
from fastapi import FastAPI, File, UploadFile,APIRouter,HTTPException
import sys
import logging
import shutil


logger = logging.getLogger("uvicorn")
logging.basicConfig(level=logging.DEBUG)
path_log = os.environ.get("PATH_LOG")
FORMAT = "%(filename)s - line:%(lineno)s - %(funcName)2s() -%(levelname)s %(asctime)-15s %(message)s"
logger = logging.getLogger(__name__)
fh = logging.FileHandler(path_log)
f = logging.Formatter(FORMAT)
fh.setFormatter(f)
logger.addHandler(fh)
logger.setLevel(logging.DEBUG)

spec_path = os.getenv("SPEC_PATH")
app = FastAPI()
validation_router =APIRouter()


def custom_json_encoder(obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")

def validate_package(spec_path, file1, file2, file3, file4):
results = {
"errorsSummary": [],
"report": None,
"originalData": {}
}

try:
with open(spec_path) as f:
spec = json.load(f)
file_mappings = {
"qe_admin_data": file1,
"screening_profile_data": file2,
"screening_observation_data": file3,
"demographic_data": file4,
}
missing_files = {key: path for key, path in file_mappings.items() if not os.path.isfile(path)}
if missing_files:
for resource_name, file_path in missing_files.items():
results["errorsSummary"].append({
"fieldName": resource_name,
"message": f"File for resource '{resource_name}' not found: {file_path}",
"type": "file-missing-error"
})
return

if not results["errorsSummary"]:
for resource_name, file_path in file_mappings.items():
rows = extract(file_path)
results["originalData"][resource_name] = rows

resources = []
for resource in spec["resources"]:
path = file_mappings.get(resource["name"])
if not path:
raise FileNotFoundError(f"File for resource '{resource['name']}' not found.")
resource_with_path = {**resource, "path": path}
resources.append(resource_with_path)
package_descriptor = {
"name": "csv-validation-using-ig",
"resources": resources
}
package = Package(package_descriptor)
common_transform_steps = [
("ORGANIZATION_TYPE_DISPLAY", "organization_type_display"),
("FACILITY_STATE", "facility_state"),
("SCREENING_STATUS_CODE", "screening_status_code"),
("SCREENING_CODE_DESCRIPTION", "screening_code_description"),
("QUESTION_CODE_TEXT", "question_code_text"),
("ANSWER_CODE", "answer_code"),
("ANSWER_CODE_DESCRIPTION", "answer_code_description"),
("GENDER", "gender"),
("EXTENSION_SEX_AT_BIRTH_CODE_VALUE", "extension_sex_at_birth_code_value"),
("RELATIONSHIP_PERSON_CODE", "relationship_person_code"),
("RELATIONSHIP_PERSON_DESCRIPTION", "relationship_person_description"),
("STATE", "state"),
("EXTENSION_GENDER_IDENTITY_DISPLAY", "extension_gender_identity_display"),
("GENDER_IDENTITY_CODE", "gender_identity_code"),
("SEXUAL_ORIENTATION_VALUE_CODE_DESCRIPTION", "sexual_orientation_value_code_description"),
("PREFERRED_LANGUAGE_CODE_SYSTEM_NAME", "preferred_language_code_system_name"),
("EXTENSION_OMBCATEGORY_RACE_CODE", "extension_ombcategory_race_code"),
("EXTENSION_OMBCATEGORY_RACE_CODE_DESCRIPTION", "extension_ombcategory_race_code_description"),
("EXTENSION_OMBCATEGORY_ETHNICITY_CODE_DESCRIPTION", "extension_ombcategory_ethnicity_code_description"),
("CONSENT_STATUS", "consent_status"),
("OBSERVATION_CATEGORY_SDOH_CODE","observation_category_sdoh_code"),
("OBSERVATION_CATEGORY_SDOH_DISPLAY","observation_category_sdoh_display"),
("QUESTION_CODE_DISPLAY","question_code_display"),
("DATA_ABSENT_REASON_CODE", "data_absent_reason_code"),
("DATA_ABSENT_REASON_DISPLAY", "data_absent_reason_display")
]

for resource in package.resources:
transform_steps = [
steps.cell_convert(field_name=field_name, function=lambda value: value.lower())
for field_name, _ in common_transform_steps
if any(field.name == field_name for field in resource.schema.fields)
]
resource = transform(resource, steps=transform_steps)
report = package.validate()
results["report"] = report.to_dict()
except FileNotFoundError as e:
results["errorsSummary"].append({
"fieldName": None,
"message": str(e),
"type": "file-missing-error"
})
except Exception as e:
results["errorsSummary"].append({
"fieldName": None,
"message": str(e),
"type": "unexpected-error"
})
return results
if __name__ == "__main__":
results = {
"errorsSummary": [],
"report": None
}
@validation_router.post("/validate_service_nyher_fhir_ig_equivalent/")
async def validate(
QE_ADMIN_DATA_FILE: UploadFile = File(..., description="File should start with QE_ADMIN_DATA_"),
SCREENING_PROFILE_DATA_FILE: UploadFile = File(..., description="File should start with SCREENING_PROFILE_DATA_"),
SCREENING_OBSERVATION_DATA_FILE: UploadFile = File(..., description="File should start with SCREENING_OBSERVATION_DATA_"),
DEMOGRAPHIC_DATA_FILE: UploadFile = File(..., description="File should start with DEMOGRAPHIC_DATA_")
):
spec_path = os.getenv("SPEC_PATH")
if not spec_path:
logger.error("SPEC_PATH environment variable is not set.")
return {"detail": "SPEC_PATH not set","status_code":400}
logger.debug(f"SPEC_PATH: {spec_path}")
if not os.path.isfile(spec_path):
logger.error(f"Spec file not found at {spec_path}")
return {"detail": f"Spec file not found at {spec_path}","status_code":400}
file_validations = {
"QE_ADMIN_DATA_FILE": QE_ADMIN_DATA_FILE.filename.startswith("QE_ADMIN_DATA_"),
"SCREENING_PROFILE_DATA_FILE": SCREENING_PROFILE_DATA_FILE.filename.startswith("SCREENING_PROFILE_DATA_"),
"SCREENING_OBSERVATION_DATA_FILE": SCREENING_OBSERVATION_DATA_FILE.filename.startswith("SCREENING_OBSERVATION_DATA_"),
"DEMOGRAPHIC_DATA_FILE": DEMOGRAPHIC_DATA_FILE.filename.startswith("DEMOGRAPHIC_DATA_")
}
for file_key, is_valid in file_validations.items():
if not is_valid:
logger.error(f"Invalid file name for {file_key}. Must start with {file_key.replace('_FILE', '')}_")
return {"detail": f"Invalid file name for {file_key}. Must start with {file_key.replace('_FILE', '')}_", "status_code":400}
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
file_dir = "nyher-fhir-ig-example" + timestamp
os.makedirs(file_dir, exist_ok=True)
temp_files = []
files = [QE_ADMIN_DATA_FILE, SCREENING_PROFILE_DATA_FILE, SCREENING_OBSERVATION_DATA_FILE, DEMOGRAPHIC_DATA_FILE]
for file in files:
temp_file_path = os.path.join(file_dir, file.filename)
logger.debug(f"Saving file {file.filename} to {temp_file_path}")
try:
with open(temp_file_path, "wb") as temp_file:
temp_file.write(await file.read())
temp_files.append(temp_file_path)
logger.debug(f"File {file.filename} saved successfully to {temp_file_path}")
except Exception as e:
logger.error(f"Failed to save file {file.filename}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to save {file.filename}")
adjusted_temp_files = [f"{file_dir}/{os.path.basename(path)}" for path in temp_files]
logger.debug(f"Adjusted file paths: {adjusted_temp_files}")
try:
logger.debug("Calling validate_package with spec file and temp files.")
results = validate_package(spec_path, *adjusted_temp_files)
logger.debug("Validation completed successfully.")
except Exception as e:
logger.error(f"Validation failed: {e}")
results = {"error": "Validation process failed."}
try:
for path in temp_files:
os.remove(path)
logger.debug(f"Removed file: {path}")
shutil.rmtree(file_dir)
logger.debug(f"Removed folder: {file_dir}")
except Exception as e:
logger.error(f"Failed to clean up: {e}")
return results

0 comments on commit d0ba592

Please sign in to comment.