Skip to content

Commit

Permalink
Merge pull request #281 from lsst-sqre/tickets/DM-45281
Browse files Browse the repository at this point in the history
DM-45281: Add UWS support library
  • Loading branch information
rra authored Aug 2, 2024
2 parents a44029a + 84375c2 commit 8a16028
Show file tree
Hide file tree
Showing 60 changed files with 7,742 additions and 71 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.5.1
rev: v0.5.5
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/20240723_170141_rra_DM_45281_queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### New features

- Add new `safir.uws` and `safir.arq.uws` modules that provide the framework of an IVOA Universal Worker Service implementation.
3 changes: 3 additions & 0 deletions changelog.d/20240723_170311_rra_DM_45281_queue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### New features

- Add new `safir.testing.uws` module that provides a mock UWS job runner for testing UWS applications.
3 changes: 3 additions & 0 deletions changelog.d/20240802_102901_rra_DM_45281.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### New features

- Add new `safir.arq.WorkerSettings` class that models the acceptable parameters for an arq `WorkerSettings` object or class that Safir applications have needed.
2 changes: 2 additions & 0 deletions docs/_rst_epilog.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.. _Alembic: https://alembic.sqlalchemy.org/en/latest/
.. _arq: https://arq-docs.helpmanual.io
.. _asyncpg: https://magicstack.github.io/asyncpg/current/
.. _Click: https://click.palletsprojects.com/
Expand Down Expand Up @@ -27,3 +28,4 @@
.. _tox-docker: https://tox-docker.readthedocs.io/en/latest/
.. _Uvicorn: https://www.uvicorn.org/
.. _virtualenvwrapper: https://virtualenvwrapper.readthedocs.io/en/stable/
.. _vo-models: https://vo-models.readthedocs.io/latest/
9 changes: 9 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ API reference
.. automodapi:: safir.arq
:include-all-objects:

.. automodapi:: safir.arq.uws
:include-all-objects:

.. automodapi:: safir.asyncio
:include-all-objects:
:inherited-members:
Expand Down Expand Up @@ -93,3 +96,9 @@ API reference

.. automodapi:: safir.testing.uvicorn
:include-all-objects:

.. automodapi:: safir.testing.uws
:include-all-objects:

.. automodapi:: safir.uws
:include-all-objects:
6 changes: 6 additions & 0 deletions docs/documenteer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ nitpick_ignore = [
["py:obj", "JobMetadata.id"],
["py:class", "pydantic.BaseModel"],
["py:class", "BaseModel"],
# arq doesn't provide documentation for all of its types.
["py:class", "arq.cron.CronJob"],
["py:class", "arq.typing.StartupShutdown"],
["py:class", "arq.typing.WorkerCoroutine"],
["py:class", "arq.worker.Function"],
# sphinx-automodapi apparently doesn't recognize TypeAlias as an object
# that should have generated documentation, even with include-all-objects.
["py:obj", "safir.pydantic.EnvAsyncPostgresDsn"],
Expand All @@ -43,6 +48,7 @@ python = "https://docs.python.org/3/"
redis = "https://redis-py.readthedocs.io/en/stable/"
structlog = "https://www.structlog.org/en/stable/"
sqlalchemy = "https://docs.sqlalchemy.org/en/latest/"
vomodels = "https://vo-models.readthedocs.io/latest/"

[sphinx.linkcheck]
ignore = [
Expand Down
11 changes: 9 additions & 2 deletions docs/user-guide/arq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,20 @@ The :file:`src/yourapp/worker/main.py` module looks like:
on_shutdown = shutdown
The ``WorkerSettings`` class is where you configure the queue and declare worker functions.
It can be either an object or a class.
If it is a class, such as in the above example, the settings must be the default values of its class variables.
See `arq.worker.Worker` for details.

The `safir.arq.WorkerSettings` class defines the subset of the expected structure of this class or object that Safir applications have needed to date.
If you wish, you can define an instance of that class at the module level instead of defining a class as in the example above.

The ``on_startup`` and ``on_shutdown`` handlers are ideal places to set up (and tear down) worker state, including network and database clients.
The context variable, ``ctx``, passed to these functions are also passed to the worker functions.

If you want to allow jobs to be aborted, add ``allow_abort_jobs = True`` to the ``WorkerSettings`` class.
If you want to allow jobs to be aborted, add ``allow_abort_jobs = True`` to ``WorkerSettings``.
If a job is already running when it is aborted, it will be cancelled using asyncio task cancellation, which means that `asyncio.CancelledError` will be raised inside the job at the next opportunity.

To run a worker, you run your application's Docker image with the ``arq`` command, followed by the fully-qualified namespace of the ``WorkerSettings`` class.
To run a worker, you run your application's Docker image with the ``arq`` command, followed by the fully-qualified name of the ``WorkerSettings`` class or object.

Using the arq dependency in endpoint handlers
---------------------------------------------
Expand Down Expand Up @@ -248,6 +253,8 @@ The `safir.dependencies.arq.arq_dependency` dependency provides your FastAPI end
For information on the metadata available from jobs, see `JobMetadata` and `JobResult`.

.. _arq-testing:

Testing applications with an arq queue
======================================

Expand Down
2 changes: 2 additions & 0 deletions docs/user-guide/database.rst
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ Here is an example of updating a DateTime field in the database:
job = (await session.execute(stmt)).scalar_one()
job.destruction_time = datetime_to_db(destruction_time)
.. _database-testing:

Testing applications that use a database
========================================

Expand Down
2 changes: 2 additions & 0 deletions docs/user-guide/gcs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ If not given, the default is one hour.
The path to the Google Cloud Storage object for which to create a signed URL must be an S3 URL.
The second argument to `~safir.gcs.SignedURLService.signed_url` is the MIME type of the underlying object, which will be encoded in the signed URL.

.. _gcs-testing:

Testing with mock Google Cloud Storage
======================================

Expand Down
1 change: 1 addition & 0 deletions docs/user-guide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ User guide
github-apps/index
click
asyncio-queue
uws/index
2 changes: 1 addition & 1 deletion docs/user-guide/set-up-from-template.rst
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ The variables are:
A Docker Hub Personal Access Token associated with ``DOCKER_USERNAME``.
`Create a dedicated token <https://docs.docker.com/security/for-developers/access-tokens/>`__ specifically for your project's GitHub Actions workflow.

After setting these secrets, re-run the GitHub Action by `re-running the workflow job from the GitHub Actions UI <https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs>`__ or by pushing a new commit to GitHub.
After setting these secrets, re-run the GitHub Action by `re-running the workflow job from the GitHub Actions UI <https://docs.github.com/en/actions/managing-workflow-runs-and-deployments/managing-workflow-runs/re-running-workflows-and-jobs>`__ or by pushing a new commit to GitHub.

8. Try the local test commands
==============================
Expand Down
252 changes: 252 additions & 0 deletions docs/user-guide/uws/create-a-service.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
.. currentmodule:: safir.uws

##########################
Creating a new UWS service
##########################

To create a new service that uses the Safir UWS library for its API, first create a new FastAPI Safir application.
The easiest way to do this is to follow the instructions in :ref:`create-from-template`.
Select the ``UWS`` flavor.

Then, flesh out the application by following these steps:

#. :doc:`Define the API parameters <define-inputs>`
#. :doc:`Define the parameter models <define-models>`
#. :doc:`Write the backend worker <write-backend>`
#. :doc:`Write the test suite <testing>`

If you use the template and select the ``UWS`` flavor, all of the steps below will be done for you and you can skip the rest of this page.
Read on if you're curious about what the ``UWS`` flavor sets up, or if you're converting an already-existing FastAPI application.

.. _uws-config:

Add UWS configuration options
=============================

UWS applications have several standard configuration options that you will want to include in your application's overall configuration.
You will also need to add a method to creete the `UWSConfig` object from your application configuration and to create a global `UWSApplication` object.
These need to be added to :file:`config.py`.

First, add additional configuration settings for UWS to ``Config`` by changing the class to inherit from `UWSAppSettings`.
This will add standard configuration options most services will need and provide helper methods and properties.

.. code-block:: python
:caption: config.py
from safir.uws import UWSAppSettings
class Config(UWSAppSettings): ...
Second, add a property to ``Config`` that returns the UWS configuration.
For some of these settings, you won't know the values yet.
You will be able to fill in the value of ``parameters_type`` after reading :doc:`define-models`, the values of ``async_post_route`` and optionally ``sync_get_route`` and ``sync_post_route`` after reading :doc:`define-inputs`, and the value of ``worker`` after reading :doc:`write-backend`.
For now, you can just insert placeholder values.

.. code-block:: python
:caption: config.py
:emphasize-lines: 1,7-15
from safir.uws import UWSAppSettings, UWSConfig, UWSRoute
class Config(UWSAppSettings):
...
@property
def uws_config(self) -> UWSConfig:
return self.build_uws_config(
async_post_route=UWSRoute(...),
parameters_type=...,
sync_get_route=UWSRoute(...),
sync_post_route=UWSRoute(...),
worker=...,
)
See `UWSAppSettings.build_uws_config` for all of the possible settings.

Third, at the bottom of :file:`config.py`, create the `UWSApplication` object and store it in ``uws``, which should be an exported symbol (listed in ``__all__``).

.. code-block:: python
:caption: config.py
:emphasize-lines: 1,8-9
from safir.uws import UWSApplication
...
config = Config()
"""Configuration for example."""
uws = UWSApplication(config.uws_config)
"""The UWS application for this service."""
Set up the FastAPI application
==============================

The Safir UWS library must be initialized when the application starts, and requires some additional FastAPI middleware and error handlers.
These need to be added to :file:`main.py`.

First, initialize the UWS application in the ``lifespan`` function:

.. code-block:: python
:caption: main.py
:emphasize-lines: 1,6,8
from .config import uws
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
await uws.initialize_fastapi()
yield
await uws.shutdown_fastapi()
await http_client_dependency.aclose()
Second, install the UWS routes into the external router before including it in the application:

.. code-block:: python
:caption: main.py
:emphasize-lines: 3
# Attach the routers.
app.include_router(internal_router)
uws.install_handlers(external_router)
app.include_router(external_router, prefix=f"{config.path_prefix}")
Third, install the UWS middleware and error handlers.

.. code-block:: python
:caption: main.py
:emphasize-lines: 3,5-6
# Add middleware.
app.add_middleware(XForwardedMiddleware)
uws.install_middleware(app)
# Install error handlers.
uws.install_error_handlers(app)
Add a command-line interface
============================

The UWS implementation uses a PostgreSQL database to store job status.
Your application will need a mechanism to initialize that database with the desired schema.
The simplest way to do this is to add a command-line interface for your application with an ``init`` command that initializes the database.

.. note::

This approach has inherent race conditions and cannot handle database schema upgrades.
It will be replaced with a more sophisticated approach using Alembic_ once that support is ready.

First, create a new :file:`cli.py` file in your application with the following contents:

.. code-block:: python
:caption: cli.py
import click
import structlog
from safir.asyncio import run_with_asyncio
from safir.click import display_help
from .config import uws
@click.group(context_settings={"help_option_names": ["-h", "--help"]})
@click.version_option(message="%(version)s")
def main() -> None:
"""Administrative command-line interface for example."""
@main.command()
@click.argument("topic", default=None, required=False, nargs=1)
@click.pass_context
def help(ctx: click.Context, topic: str | None) -> None:
"""Show help for any command."""
display_help(main, ctx, topic)
@main.command()
@click.option(
"--reset", is_flag=True, help="Delete all existing database data."
)
@run_with_asyncio
async def init(*, reset: bool) -> None:
"""Initialize the database storage."""
logger = structlog.get_logger("example")
await uws.initialize_uws_database(logger, reset=reset)
Look for the instances of ``example`` and replace them with the name of your application.

Second, register this interface with Python in :file:`pyproject.toml`:

.. code-block:: toml
:caption: pyproject.toml
[project.scripts]
example = "example.cli:main"
Again, replace ``example`` with the name of your application.

Third, change the :file:`Dockerfile` for your application to run a startup script rather than run :command:`uvicorn` directly:

.. code-block:: docker
:caption: Dockerfile
# Copy the startup script
COPY scripts/start-frontend.sh /start-frontend.sh
# Run the application.
CMD ["/start-frontend.sh"]
Finally, create the :file:`scripts/start-frontend.sh` file:

.. code-block:: bash
:caption: scripts/start-frontend.sh
#!/bin/bash
#
# Create the database and then start the server.
set -eu
example init
uvicorn example.main:app --host 0.0.0.0 --port 8080
Again, replace ``example`` with the name of your application.

Create the arq worker for database updates
==========================================

Your application will have two separate arq_ worker pods, one to do the actual work of your application and one to handle database updates and state tracking.
The code for the second worker is part of the UWS library, but you have to add a small amount of code to enable it and attach it to your application configuration.

Create a subdirectory named :file:`workers` in the source for your application with an empty :file:`workers/__init__.py` file.
Then, create :file:`workers/uws.py` with the following contents:

.. code-block:: python
:caption: workers/uws.py
import structlog
from safir.logging import configure_logging
from ..config import config, uws
configure_logging(
name="example", profile=config.profile, log_level=config.log_level
)
WorkerSettings = uws.build_worker(structlog.get_logger("example"))
"""arq configuration for the UWS database worker."""
Once again, replace ``example`` with the name of your application.

Next steps
==========

Now that you have set up the basic structure of your application, you can move on to the substantive parts.

- Define the API parameters: :doc:`define-inputs`
- Define the parameter models: :doc:`define-models`
- Write the backend worker :doc:`write-backend`
Loading

0 comments on commit 8a16028

Please sign in to comment.