Skip to content

Commit

Permalink
Merge pull request #49 from Dewberry/refactor
Browse files Browse the repository at this point in the history
Refactor
  • Loading branch information
slawler authored Jun 27, 2024
2 parents 86b83d0 + 6fc2aee commit 317b53d
Show file tree
Hide file tree
Showing 65 changed files with 2,142 additions and 2,449 deletions.
9 changes: 9 additions & 0 deletions .flaskenv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Do not put secrets in here! This file is in the repo.

FLASK_APP=api.app
FLASK_DEBUG=0
FLASK_RUN_RELOAD=0
FLASK_RUN_HOST=0.0.0.0
FLASK_RUN_PORT=80

# Do not put secrets in here! This file is in the repo.
69 changes: 69 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
name: CI

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

permissions:
contents: read

concurrency:
group: ${{ github.ref }}
cancel-in-progress: true

jobs:
build:

strategy:
matrix:
python-version:
- "3.9"
- "3.10"
- "3.11"
- "3.12"
os: ["windows-latest"]

runs-on: ${{ matrix.os }}
environment: ci-env

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install ".[dev]"
# - name: Lint (ruff)
# run: |
# ruff check
# ruff format --check

- name: Run tests with coverage
run: |
pytest tests/conflation_tests.py
pytest tests/read_ras_files_tests.py
# - name: Run tests with coverage
# run: |
# pytest --cov --cov-report term-missing

# - name: Run tests with coverage
# run: |
# pytest --cov=src --cov-report term-missing

# - name: Run coverage
# run: |
# coverage xml

# - name: Upload coverage reports to Codecov
# uses: codecov/[email protected]
# with:
# token: ${{ secrets.CODECOV_TOKEN }}
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,15 @@ test.py
*sqlite.py
*process.ipynb
*.db
*.db-shm
*.db-wal
tests/ras-data/Baxter/test
tests/ras-data/Baxter/nwm_models
tests/ras-data/Baxter/Terrain
tests/ras-data/WFSJMAIN/
/ripple/models/
.vscode/settings.json
mip_models
tests/ras-data/combined
tests/ras-data/combined
dev
/api/logs/*.log
9 changes: 9 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.4.2
hooks:
# Run the linter.
- id: ruff
# Run the formatter.
- id: ruff-format
2 changes: 2 additions & 0 deletions .pylinyrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[FORMAT]
max-line-length=120
75 changes: 44 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,10 @@
Utilities for reuse of HEC-RAS models for NWM. HEC-RAS models must be cataloged in a Spatial Temporal Asset Catalog (STAC) and conflated with the NWM reach dataset prior to processing through ripple.

## Contents
- /conflate: RAS-FIM Conflation
- /stacio: Build and update STAC items
- /exe: Build RAS Terrains and add to STAC items
- /examples: examples

Refactor in progress

## Getting Started
To utilize this repository to develop flood inundations maps from 1D steady-state HEC-RAS models the following are required:
- A Windows operating system
- HEC-RAS 6.3.1 installed
- Python packages specified in requirements-windows.txt installed
- Access to s3
- An href for a STAC item containing the following minimum content:
- Assets representing the necessary HEC-RAS files with role assigned as "RAS-FILE"
- HEC-RAS project file
- HEC-RAS geometry file
- HEC-RAS plan file
- HEC-RAS flow file

- Assets for HEC-RAS terrain data with role assigned as "RAS-TOPO"
- RAS_Terrain.hdf
- RAS_Terrain.tif
- RAS_Terrain.vrt

- An asset for a geopackage containing the model geometry with role "RAS-GEOMETRY-GPKG"

- An asset containing conflation parameters with assigned role "RIPPLE-PARAMS".
The conflation parameters asset should be a nested json which contains a key for each NWM branch associated with RAS model. The parameters shown below must exists for each NWM branch.

![alt text](image-2.png)


Refactor in progress

## About
Producing inundation maps at half-foot increments for each NWM branch in a given RAS model is a multi-step process outlined below. "run_process.py" is a script that executes the process in the necessary sequential order.
Expand All @@ -52,4 +24,45 @@ Producing inundation maps at half-foot increments for each NWM branch in a given
- The water surface elevations derived from the downstream rating-curve are applied as an intermediate known water surface elevation at the downstream terminus of the NWM branch
9. Run the production-runs with post-processing depth rasters toggled on
10. For each NWM branch, clip each resulting depth raster to a concave hull derived from the cross sections associated with each branch.
11. For each NWM branch read the HDF results and write rating-curves to sqlite db.
11. For each NWM branch read the HDF results and write rating-curves to sqlite db.

<br>
<br>
<br>

# REST API

## About the API

The REST API server leverages huey for concurrent task (job) execution, and Flask as the user HTTP REST interface into the huey
system. The HTTP endpoints adhere closely to [OGC Process API](https://ogcapi.ogc.org/processes/overview.html) standards.

## Environment Requirements of the API

1. Windows host with Desktop Experience, due to its usage of the HEC-RAS GUI.
1. Python 3.8 or higher, with dependencies installed per requirements.txt.
1. HEC-RAS installed, and EULA accepted. For each version of RAS, open the program once to read and accept the EULA.

## API Launch Steps

1. If necessary, edit [.flaskenv](.flaskenv) (do not store any secrets or credentials in this file!)
1. If necessary, initialize or edit `.env`, e.g. for data access credentials. Care should be taken not to include the same variable names in [.flaskenv](.flaskenv) and in `.env`.
1. If necessary, edit [api-start.bat](api-start.bat) to:
1. Adjust the number of threads (cores) that huey will use to process concurrent tasks (jobs). By default, [api-start.bat](api-start.bat) is configured to inspect the number of cores on the host machine and use a value of (N - 2), with a minimum of 1.
1. Adjust the huey logging behavior. By default, the huey logs will not stream to the terminal, and will instead stream to a local file.
1. Adjust the huey consumer path. By default, a typical such path is assumed in [api-start.bat](api-start.bat).
1. Double-click [api-start.bat](api-start.bat). This will cause two Windows terminals to open. One will be running huey and the other will be running Flask. **Warning: this will delete and re-initialize the `api\logs\` directory, which includes the huey tasks database in addition to the log files.**
1. Double-click [api-test.bat](api-test.bat). This will send a request to the ping endpoint of the API confirming that it is online and ready to process jobs.

## API Administration Notes

**Warning: [api-start.bat](api-start.bat) will delete and re-initialize the `api\logs\` directory, which includes the huey tasks database in addition to the log files.**

huey is configured to use a local SQLite database as its store for managing tasks and storing their returned values. If the db file
does not exist, it will be created when the huey consumer is executed. If it does exist, it will be used as-is and not overridden.
Therefore if the server administrator needs to ungracefully stop all tasks and/or re-start the server, then if they want to be sure that
any existing tasks are fully cleared / removed from the system, they should manually delete the db file themselves before re-starting
the server.

The SQLite method is highly stable, but there may be rare instances in which a "lock" race condition error may occur under heavy load.
If this is a concern, the server administrator could reconfigure huey to use another store supported by huey, such as Redis.
34 changes: 34 additions & 0 deletions api-start.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
@echo off

set thread_count=1

:: Identify the full path to huey_consumer.py (ships with huey library)
set huey_consumer_full_path=
for /f "delims=" %%i in ('where huey_consumer.py') do set huey_consumer_full_path=%%i

:: Set up logs dir
set logs_dir="api\logs\"
echo "Deleting logs dir if exists: %logs_dir%"
rmdir /s /q %logs_dir%
if exist %logs_dir% (
echo Error: could not delete %logs_dir%
echo Press any key to exit.
set /p input=
exit /b 1
)
echo "Creating logs dir: %logs_dir%"
if not exist %logs_dir% mkdir %logs_dir%
if %errorlevel% neq 0 (
echo Error: could not create %logs_dir%
echo Press any key to exit.
set /p input=
exit /b 1
)

:: Launch huey consumer in separate terminal
echo "Starting ripple-huey"
start "ripple-huey" cmd /k "python -u %huey_consumer_full_path% api.tasks.huey -w %thread_count%"

:: Launch Flask app in a separate terminal
echo "Starting ripple-flask"
start "ripple-flask" cmd /k "flask run"
10 changes: 10 additions & 0 deletions api-test.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
@echo off

set api_test_script="api\run_api_test.py"
echo Running script: %api_test_script%

python -u "%api_test_script%"

if %errorlevel% neq 0 (echo Error above with code %errorlevel%.) else (echo Success.)
echo Press enter to exit this test terminal.
set /p input=
132 changes: 132 additions & 0 deletions api/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from __future__ import annotations

from http import HTTPStatus
import time
import traceback

from flask import Flask, Response, jsonify, request
from werkzeug.exceptions import BadRequest

from api import tasks
from api.utils import get_unexpected_and_missing_args

app = Flask(__name__)


@app.route("/ping", methods=["GET"])
def ping():
return jsonify({"status": "healthy"}), HTTPStatus.OK


@app.route("/processes/test/execution", methods=["POST"])
def test():
response, http_status = process_async_request(tasks.noop)
if http_status != HTTPStatus.CREATED:
return jsonify(response.json, HTTPStatus.INTERNAL_SERVER_ERROR)

timeout_seconds = 10

start = time.time()
while time.time() - start < timeout_seconds:
time.sleep(0.2)
status = tasks.status(response.json["jobID"])
if status == "failed":
return jsonify({"status": "not healthy"}), HTTPStatus.INTERNAL_SERVER_ERROR
if status == "successful":
return jsonify({"status": "healthy"}), HTTPStatus.OK
return (
jsonify({"status": f"huey is busy or not active, ping timed out after {timeout_seconds} seconds"}),
HTTPStatus.GATEWAY_TIMEOUT,
)


@app.route("/jobs/<task_id>", methods=["GET"])
def task_status(task_id):
# https://developer.ogc.org/api/processes/index.html#tag/Status
status = tasks.status(task_id)
if status == "accepted":
return jsonify({"type": "process", "jobID": task_id, "status": status}), HTTPStatus.OK
if status == "running":
return jsonify({"type": "process", "jobID": task_id, "status": status}), HTTPStatus.OK
if status == "successful":
return (
jsonify({"type": "process", "jobID": task_id, "status": status, "detail": tasks.result(task_id)}),
HTTPStatus.OK,
)
if status == "failed":
return (
jsonify({"type": "process", "jobID": task_id, "status": status, "detail": tasks.result(task_id)}),
HTTPStatus.OK,
)
if status == "dismissed":
return (
jsonify({"type": "process", "jobID": task_id, "status": status, "detail": tasks.result(task_id)}),
HTTPStatus.OK,
)
if status == "notfound":
return jsonify({"type": "process", "detail": f"job ID not found: {task_id}"}), HTTPStatus.NOT_FOUND
return jsonify({"type": "process", "detail": f"unexpected status: {status}"}), HTTPStatus.INTERNAL_SERVER_ERROR


@app.route("/jobs/<task_id>/results", methods=["GET"])
def task_result(task_id):
# https://developer.ogc.org/api/processes/index.html#tag/Result
try:
status = tasks.status(task_id)
if status == "notfound":
return jsonify({"type": "process", "detail": f"job ID not found: {task_id}"}), HTTPStatus.NOT_FOUND
else:
return jsonify({"type": "process", "detail": tasks.result(task_id)}), HTTPStatus.OK
except:
return jsonify({"type": "process", "detail": f"failed to fetch results"}), HTTPStatus.INTERNAL_SERVER_ERROR


@app.route("/jobs/<task_id>/dismiss", methods=["DELETE"])
def dismiss(task_id):
# https://developer.ogc.org/api/processes/index.html#tag/Dismiss
try:
status = tasks.status(task_id)
if status == "notfound":
return jsonify({"type": "process", "detail": f"job ID not found: {task_id}"}), HTTPStatus.NOT_FOUND
else:
result = tasks.revoke(task_id)
return jsonify({"type": "process", "detail": result}), HTTPStatus.OK
except:
return (
jsonify({"type": "process", "detail": f"failed to dismiss job ID: {task_id}"}),
HTTPStatus.INTERNAL_SERVER_ERROR,
)


def process_async_request(func: callable) -> tuple[Response, HTTPStatus]:
"""Start the execution of the provided func using kwargs from the request body (assume body is a JSON dictionary)"""
# https://developer.ogc.org/api/processes/index.html#tag/Execute/operation/execute
try:
kwargs = request.json # can throw BadRequest when parsing body into json
if not isinstance(kwargs, dict):
raise BadRequest
except BadRequest:
return (
jsonify({"type": "process", "detail": "could not parse body to json dict"}),
HTTPStatus.BAD_REQUEST,
)

unexpected, missing = get_unexpected_and_missing_args(func, set(kwargs))
if unexpected or missing:
return (
jsonify({"type": "process", "detail": f"unexpected args: {unexpected}, missing args: {missing}"}),
HTTPStatus.BAD_REQUEST,
)

try:
# tasks.process must be a huey TaskWrapper (decorated by @huey.task). Returns a huey Result object immediately.
result = tasks.process(func, kwargs)
except:
msg = f"unable to submit task: {traceback.format_exc()}"
app.logger.error(msg)
return jsonify({"type": "process", "detail": msg}), HTTPStatus.INTERNAL_SERVER_ERROR
else:
return (
jsonify({"type": "process", "jobID": result.id, "status": "accepted"}),
HTTPStatus.CREATED,
)
Loading

0 comments on commit 317b53d

Please sign in to comment.