From 7329ca703e1bf67b13d8aeafe1c48ca65866bb2b Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Wed, 24 Jul 2024 17:51:01 -0700 Subject: [PATCH] Add documentation for the UWS library Add a section to the user guide documenting how to use the UWS library. Add anchors to some other documentation pages to make linking easier. --- docs/_rst_epilog.rst | 2 + docs/user-guide/arq.rst | 2 + docs/user-guide/database.rst | 2 + docs/user-guide/gcs.rst | 2 + docs/user-guide/index.rst | 1 + docs/user-guide/uws/create-a-service.rst | 254 +++++++++++++++++++++++ docs/user-guide/uws/define-inputs.rst | 247 ++++++++++++++++++++++ docs/user-guide/uws/define-models.rst | 130 ++++++++++++ docs/user-guide/uws/index.rst | 27 +++ docs/user-guide/uws/testing.rst | 204 ++++++++++++++++++ docs/user-guide/uws/write-backend.rst | 175 ++++++++++++++++ safir/src/safir/uws/_config.py | 2 +- 12 files changed, 1047 insertions(+), 1 deletion(-) create mode 100644 docs/user-guide/uws/create-a-service.rst create mode 100644 docs/user-guide/uws/define-inputs.rst create mode 100644 docs/user-guide/uws/define-models.rst create mode 100644 docs/user-guide/uws/index.rst create mode 100644 docs/user-guide/uws/testing.rst create mode 100644 docs/user-guide/uws/write-backend.rst diff --git a/docs/_rst_epilog.rst b/docs/_rst_epilog.rst index 58401214..aa7ed87e 100644 --- a/docs/_rst_epilog.rst +++ b/docs/_rst_epilog.rst @@ -1,3 +1,4 @@ +.. _Alembic: https://alembic.sqlalchemy.org/en/latest/ .. _arq: https://arq-docs.helpmanual.io .. _asyncpg: https://magicstack.github.io/asyncpg/current/ .. _Click: https://click.palletsprojects.com/ @@ -27,3 +28,4 @@ .. _tox-docker: https://tox-docker.readthedocs.io/en/latest/ .. _Uvicorn: https://www.uvicorn.org/ .. _virtualenvwrapper: https://virtualenvwrapper.readthedocs.io/en/stable/ +.. _vo-models: https://vo-models.readthedocs.io/latest/ diff --git a/docs/user-guide/arq.rst b/docs/user-guide/arq.rst index ea787819..b0f7b72e 100644 --- a/docs/user-guide/arq.rst +++ b/docs/user-guide/arq.rst @@ -248,6 +248,8 @@ The `safir.dependencies.arq.arq_dependency` dependency provides your FastAPI end For information on the metadata available from jobs, see `JobMetadata` and `JobResult`. +.. _arq-testing: + Testing applications with an arq queue ====================================== diff --git a/docs/user-guide/database.rst b/docs/user-guide/database.rst index 5975bbd5..3d18f726 100644 --- a/docs/user-guide/database.rst +++ b/docs/user-guide/database.rst @@ -267,6 +267,8 @@ Here is an example of updating a DateTime field in the database: job = (await session.execute(stmt)).scalar_one() job.destruction_time = datetime_to_db(destruction_time) +.. _database-testing: + Testing applications that use a database ======================================== diff --git a/docs/user-guide/gcs.rst b/docs/user-guide/gcs.rst index 22bcdbd6..6c013323 100644 --- a/docs/user-guide/gcs.rst +++ b/docs/user-guide/gcs.rst @@ -40,6 +40,8 @@ If not given, the default is one hour. The path to the Google Cloud Storage object for which to create a signed URL must be an S3 URL. The second argument to `~safir.gcs.SignedURLService.signed_url` is the MIME type of the underlying object, which will be encoded in the signed URL. +.. _gcs-testing: + Testing with mock Google Cloud Storage ====================================== diff --git a/docs/user-guide/index.rst b/docs/user-guide/index.rst index 73c951c3..d129c78f 100644 --- a/docs/user-guide/index.rst +++ b/docs/user-guide/index.rst @@ -32,3 +32,4 @@ User guide github-apps/index click asyncio-queue + uws/index diff --git a/docs/user-guide/uws/create-a-service.rst b/docs/user-guide/uws/create-a-service.rst new file mode 100644 index 00000000..9cdf5836 --- /dev/null +++ b/docs/user-guide/uws/create-a-service.rst @@ -0,0 +1,254 @@ +.. currentmodule:: safir.uws + +########################## +Creating a new UWS service +########################## + +To create a new service that uses the Safir UWS library for its API, first create a new FastAPI Safir application. +The easiest way to do this is to follow the instructions in :ref:`create-from-template`. + +Then, flesh out the application by following these steps: + +#. Create the application structure +#. :doc:`Define the API parameters ` +#. :doc:`Define the parameter models ` +#. :doc:`Write the backend worker ` +#. :doc:`Write the test suite ` + +The first step is covered in this page. +The remaining steps are discussed in the linked pages. + +.. _uws-config: + +Add UWS configuration options +============================= + +UWS applications have several standard configuration options that you will want to include in your application's overall configuration. +You will also need to add a method to creete the `UWSConfig` object from your application configuration and to create a global `UWSApplication` object. +These need to be added to :file:`config.py`. + +First, add additional configuration settings for UWS to ``Config`` by changing the class to inherit from `UWSAppSettings`. +This will add standard configuration options most services will need and provide helper methods and properties. + +.. code-block:: python + :caption: config.py + :emphasize-lines: 1,4 + + from safir.uws import UWSAppSettings + + + class Config(UWSAppSettings): ... + +Second, add a property to ``Config`` that returns the UWS configuration. +For some of these settings, you won't know the values yet. +You will be able to fill in the value of ``parameters_type`` after reading :doc:`define-models`, the values of ``async_post_route`` and optionally ``sync_get_route`` and ``sync_post_route`` after reading :doc:`define-inputs`, and the value of ``worker`` after reading :doc:`write-backend`. +For now, you can just insert placeholder values. + +.. code-block:: python + :caption: config.py + :emphasize-lines: 1,7-15 + + from safir.uws import UWSAppSettings, UWSConfig, UWSRoute + + + class Config(UWSAppSettings): + ... + + @property + def uws_config(self) -> UWSConfig: + return self.build_uws_config( + async_post_route=UWSRoute(...), + parameters_type=..., + sync_get_route=UWSRoute(...), + sync_post_route=UWSRoute(...), + worker=..., + ) + +See `UWSAppSettings.build_uws_config` for all of the possible settings. + +Third, at the bottom of :file:`config.py`, create the `UWSApplication` object and store it in ``uws``, which should be an exported symbol (listed in ``__all__``). + +.. code-block:: python + :caption: config.py + :emphasize-lines: 1,8-9 + + from safir.uws import UWSApplication + + ... + + config = Config() + """Configuration for example.""" + + uws = UWSApplication(config.uws_config) + """The UWS application for this service.""" + +Set up the FastAPI application +============================== + +The Safir UWS library must be initialized when the application starts, and requires some additional FastAPI middleware and error handlers. +These need to be added to :file:`main.py`. + +First, initialize the UWS application in the ``lifespan`` function: + +.. code-block:: python + :caption: main.py + :emphasize-lines: 1,6,8 + + from .config import uws + + + @asynccontextmanager + async def lifespan(app: FastAPI) -> AsyncIterator[None]: + await uws.initialize_fastapi() + yield + await uws.shutdown_fastapi() + await http_client_dependency.aclose() + +Second, install the UWS routes into the external router before including it in the application: + +.. code-block:: python + :caption: main.py + :emphasize-lines: 3 + + # Attach the routers. + app.include_router(internal_router) + uws.install_handlers(external_router) + app.include_router(external_router, prefix=f"{config.path_prefix}") + +Third, install the UWS middleware and error handlers. + +.. code-block:: python + :caption: main.py + :emphasize-lines: 3,5-6 + + # Add middleware. + app.add_middleware(XForwardedMiddleware) + uws.install_middleware(app) + + # Install error handlers. + uws.install_error_handlers(app) + +Add a command-line interface +============================ + +The UWS implementation uses a PostgreSQL database to store job status. +Your application will need a mechanism to initialize that database with the desired schema. +The simplest way to do this is to add a command-line interface for your application with an ``init`` command that initializes the database. + +.. note:: + + This approach has inherent race conditions and cannot handle database schema upgrades. + It will be replaced with a more sophisticated approach using Alembic_ once that support is ready. + +First, create a new :file:`cli.py` file in your application with the following contents: + +.. code-block:: python + :caption: cli.py + + import click + import structlog + from safir.asyncio import run_with_asyncio + from safir.click import display_help + + from .config import uws + + + @click.group(context_settings={"help_option_names": ["-h", "--help"]}) + @click.version_option(message="%(version)s") + def main() -> None: + """Administrative command-line interface for example.""" + + + @main.command() + @click.argument("topic", default=None, required=False, nargs=1) + @click.pass_context + def help(ctx: click.Context, topic: str | None) -> None: + """Show help for any command.""" + display_help(main, ctx, topic) + + + @main.command() + @click.option( + "--reset", is_flag=True, help="Delete all existing database data." + ) + @run_with_asyncio + async def init(*, reset: bool) -> None: + """Initialize the database storage.""" + logger = structlog.get_logger("example") + await uws.initialize_uws_database(logger, reset=reset) + +Look for the instances of ``example`` and replace them with the name of your application. + +Second, register this interface with Python in :file:`pyproject.toml`: + +.. code-block:: toml + :caption: pyproject.toml + + [project.scripts] + example = "example.cli:main" + +Again, replace ``example`` with the name of your application. + +Third, change the :file:`Dockerfile` for your application to run a startup script rather than run :command:`uvicorn` directly: + +.. code-block:: docker + :caption: Dockerfile + :emphasize-lines: 1-2,5 + + # Copy the startup script + COPY scripts/start-frontend.sh /start-frontend.sh + + # Run the application. + CMD ["/start-frontend.sh"] + +Finally, create the :file:`scripts/start-frontend.sh` file: + +.. code-block:: bash + :caption: scripts/start-frontend.sh + + #!/bin/bash + # + # Create the database and then start the server. + + set -eu + + example init + uvicorn example.main:app --host 0.0.0.0 --port 8080 + +Again, replace ``example`` with the name of your application. + +Create the arq worker for database updates +========================================== + +Your application will have two separate arq_ worker pods, one to do the actual work of your application and one to handle database updates and state tracking. +The code for the second worker is part of the UWS library, but you have to add a small amount of code to enable it and attach it to your application configuration. + +Create a subdirectory named :file:`workers` in the source for your application with an empty :file:`workers/__init__.py` file. +Then, create :file:`workers/uws.py` with the following contents: + +.. code-block:: python + :caption: workers/uws.py + + import structlog + from safir.logging import configure_logging + + from ..config import config, uws + + + configure_logging( + name="example", profile=config.profile, log_level=config.log_level + ) + + WorkerSettings = uws.build_worker(structlog.get_logger("example")) + """arq configuration for the UWS database worker.""" + +Once again, replace ``example`` with the name of your application. + +Next steps +========== + +Now that you have set up the basic structure of your application, you can move on to the substantive parts. + +- Define the API parameters: :doc:`define-inputs` +- Define the parameter models: :doc:`define-models` +- Write the backend worker :doc:`write-backend` diff --git a/docs/user-guide/uws/define-inputs.rst b/docs/user-guide/uws/define-inputs.rst new file mode 100644 index 00000000..367cbddb --- /dev/null +++ b/docs/user-guide/uws/define-inputs.rst @@ -0,0 +1,247 @@ +.. currentmodule:: safir.uws + +####################### +Defining service inputs +####################### + +Your UWS service will take one more more input parameters. +The UWS library cannot know what those parameters are, so you will need to define them and pass that configuration into the UWS library configuration. +This is done by writing a FastAPI dependency that returns a list of input parameters as key/value pairs. + +What parameters look like +========================= + +UWS input parameters for a job are a list of key/value pairs. +The value is always a string. +Other data types are not directly supported. +If your service needs a different data type as a parameter value, you will need to accept it as a string and then parse it into a more complex structure. +See :doc:`define-models` for how to do that. + +All FastAPI dependencies provided by your application must return a list of `UWSJobParameter` objects. +The ``parameter_id`` attribute is the key and the ``value`` attribute is the value. + +The key (the ``parameter_id``) is case-insensitive in the input. +When creating a `UWSJobParameter` object, it should be converted to lowercase (by using ``.lower()``, for example) so that the rest of the service can assume the lowercase form. + +UWS allows the same ``parameter_id`` to occur multiple times with different values. +For example, multiple ``id`` parameters may specify multiple input objects for a bulk operation that processes all of the input objects at the same time. + +Ways to create jobs +=================== + +There are three possible ways to create a job in UWS: ``POST`` to create an async job, ``POST`` to create a sync job, and ``GET`` to create a sync job. + +An async job creates a job record and starts the operation in the background. +The client then needs to wait or poll for the job to complete and can retrieve the results from the job record. +Multiple results are supported, and each will have its own identifier. + +A sync job creates the job, waits for it to complete, and returns the result. +Sync jobs can only be used for operations that complete relatively quickly, because many HTTP clients will not wait for long for a response. +Browsers will normally not wait more than a minute at most, and sync jobs are not suitable for any operation that takes longer than five minutes. +Sync jobs are not supported by default, but can be easily enabled. + +Sync jobs can be created via either ``POST`` or ``GET``. +You can pick whether your application will support sync ``POST``, sync ``GET``, both, or neither. +Supporting ``GET`` makes it easier for people to assemble ad hoc jobs by writing the URL directly in their web browser. +However, due to unfixable web security reasons, ``GET`` jobs can be created by any malicious site on the Internet, and therefore should not be supported if the operation of your service is destructive, expensive, or dangerous if performed by unauthorized people. + +For each supported way to create a job, your application must provide a FastAPI dependency that reads input parameters via that method and returns a list of `UWSJobParameter` objects. + +async POST dependency +--------------------- + +Supporting async ``POST`` is required. +First, writing a FastAPI dependency that accepts the input parameters for your job as `form parameters `__. +Conventionally, this dependency goes into :file:`dependencies.py`. +Here is an example for a SODA service that performs circular cutouts: + +.. code-block:: python + :caption: dependencies.py + + from typing import Annotated + + from fastapi import Depends, Form + from safir.uws import UWSJobParameter, uws_post_params_dependency + + + async def post_params_dependency( + *, + id: Annotated[ + str | list[str] | None, + Form( + title="Source ID", + description=( + "Identifiers of images from which to make a cutout. This" + " parameter is mandatory." + ), + ), + ] = None, + circle: Annotated[ + str | list[str] | None, + Form( + title="Cutout circle positions", + description=( + "Circles to cut out. The value must be the ra and dec of the" + " center of the circle and then the radius, as" + " double-precision floating point numbers expressed as" + " strings and separated by spaces." + ), + ), + ] = None, + params: Annotated[ + list[UWSJobParameter], Depends(uws_post_params_dependency) + ], + ) -> list[UWSJobParameter]: + """Parse POST parameters into job parameters for a cutout.""" + return [p for p in params if p.parameter_id in {"id", "circle"}] + +This first declares the input parameters, with full documentation, as FastAPI ``Form`` parameters. +Note that the type is ``str | list[str]``, which allows the parameter to be specified multiple times. + +Unfortunately, supporting UWS's case-insensitivity is obnoxious in FastAPI. +This is the purpose for the extra ``params`` argument that uses `uws_post_params_dependency`. +The explicitly-declared parameters are there only to generate API documentation and are not used directly. +Instead, the ``params`` argument collects all of the input form parameters and converts them into a canonical form for you, regardless of the case used for the key. +The body of the function then only needs to filter those parameters down to the ones that are relevant for your application and return them. + +You do not need to do any input validation here. +This will be done later as part of converting the input parameters to your parameter model, as defined in :doc:`define-models`. + +async POST configuration +------------------------ + +Finally, you need to tell the UWS library about this configuration, and also provide some additional metadata for the route. +This is done in the ``async_post_route`` argument to `UWSAppSettings.build_uws_config` as mentioned in :ref:`uws-config`. +Now you can replace the ``...`` in that example with a full `UWSRoute` object. + +Here is an example for the same cutout service: + +.. code-block:: python + :caption: config.py + + from safir.uws import UWSRoute + + from .dependencies import post_params_dependency + + + async_post_route = UWSRoute( + dependency=post_params_dependency, + summary="Create async cutout job", + description="Create a new UWS job to perform an image cutout", + ) + +This would then be passed as the ``async_post_route`` argument. + +The ``summary`` and ``description`` attributes are only used to generate the API documentation. +They contain a brief summary and a longer description of the async ``POST`` route and will be copied into the generated OpenAPI specification for the service. + +sync POST +--------- + +Supporting sync ``POST`` is very similar: define a FastAPI dependency that accepts ``POST`` parameters and returns a list of `UWSJobParameter` objects, and then define a `UWSRoute` object including that dependency and pass it as the ``sync_post_route`` argument to `UWSAppSettings.build_uws_config`. +By default, sync ``POST`` is not supported. + +Normally, the input parameters for sync ``POST`` will be the same as the input parameters for async ``POST``, so you can reuse the same FastAPI dependency. + +Here is an example for the same cutout service: + +.. code-block:: python + :caption: config.py + + from safir.uws import UWSRoute + + from .dependencies import post_params_dependency + + + sync_post_route = UWSRoute( + dependency=post_params_dependency, + summary="Synchronous cutout", + description=( + "Synchronously request a cutout. This will wait for the" + " cutout to be completed and return the resulting image" + " as a FITS file. The image will be returned via a" + " redirect to a URL at the underlying object store." + ), + ) + +This would then be passed as the ``sync_post_route`` argument. + +sync GET +-------- + +Supporting sync ``GET`` follows the same pattern, but here you will need to define a separate dependency that takes query parameters rather than form parameters. +Here is an example dependency for a cutout service: + +.. code-block:: python + :caption: dependencies.py + + from typing import Annotated + + from fastapi import Depends, Query, Request + + + async def get_params_dependency( + *, + id: Annotated[ + list[str], + Query( + title="Source ID", + description=( + "Identifiers of images from which to make a cutout. This" + " parameter is mandatory." + ), + ), + ], + circle: Annotated[ + list[str], + Query( + title="Cutout circle positions", + description=( + "Circles to cut out. The value must be the ra and dec of" + " the center of the circle and then the radius, as" + " double-precision floating point numbers expressed as" + " strings and separated by spaces." + ), + ), + ], + request: Request, + ) -> list[UWSJobParameter]: + """Parse GET parameters into job parameters for a cutout.""" + return [ + UWSJobParameter(parameter_id=k, value=v) + for k, v in request.query_params.items() + if k in {"id", "circle"} + ] + +This code is somewhat simpler and doesn't need `uws_post_params_dependency`. +The UWS library installs FastAPI middleware that canonicalizes the case of all query parameter keys, so your application can assume they are lowercase. + +As in the other cases, you will then need to pass a `UWSRoute` object as the ``sync_get_route`` argument to `UWSAppSettings.build_uws_config`. +Here is an example: + +.. code-block:: python + :caption: config.py + + from safir.uws import UWSRoute + + from .dependencies import get_params_dependency + + + sync_post_route = UWSRoute( + dependency=get_params_dependency, + summary="Synchronous cutout", + description=( + "Synchronously request a cutout. This will wait for the" + " cutout to be completed and return the resulting image" + " as a FITS file. The image will be returned via a" + " redirect to a URL at the underlying object store." + ), + ) + +This would then be passed as the ``sync_post_route`` argument. + +Next steps +========== + +- Define the parameter models: :doc:`define-models` +- Write the backend worker :doc:`write-backend` diff --git a/docs/user-guide/uws/define-models.rst b/docs/user-guide/uws/define-models.rst new file mode 100644 index 00000000..b897870b --- /dev/null +++ b/docs/user-guide/uws/define-models.rst @@ -0,0 +1,130 @@ +.. currentmodule:: safir.uws + +########################### +Define job parameter models +########################### + +UWS models all parameters as simple lists of key/value pairs with string values. +However, for internal purposes, most applications will want a more sophisticated parameter model than that, with better input validation. +The frontend should parse and validate the input parameters so that it can fail quickly if they are invalid, rather than creating a job, dispatching it, and only then having it fail due to invalid parameters. + +UWS applications therefore must define two models for input parameters, both Pydantic models. +The first is the model of parameters as provided by users, and is used to validate the input parameters. +The second is the model that will be passed to the backend worker. + +.. _uws-worker-model: + +Worker parameter model +====================== + +The UWS library uses a Pydantic model to convey the job parameters to the backend worker. +This Pydantic model is serialized to a JSON-compatible dictionary before being sent to the backend worker and then deserialized back into a Pydantic model in the backend. +Every field must therefore be JSON-serializable and deserializable. + +Here is a simple example for a cutout service: + +.. code-block:: python + :caption: models/domain/cutout.py + + from pydantic import BaseModel, Field + + + class Point(BaseModel): + ra: float = Field(..., title="ICRS ra in degrees") + dec: float = Field(..., title="ICRS dec in degrees") + + + class CircleStencil(BaseModel): + center: Point = Field(..., title="Center") + radius: float = Field(..., title="Radius") + + + class WorkerCutout(BaseModel): + dataset_ids: list[str] + stencils: list[WorkerCircleStencil] + +This model will be imported by both the frontend and the backend worker, and therefor must not depend on any of the other frontend code or any Python libraries that will not be present in the worker backend. + +Using complex data types in the worker model +-------------------------------------------- + +It will often be tempting to use more complex data types in the worker model because they are closer to the underlying implementation code and allow more validation to be performed in the frontend. +For example, one may wish the worker model to use astropy_ ``Angle`` and ``SkyCoord`` data types instead of simple Python floats. + +.. _astropy: https://www.astropy.org/ + +This is possible, but be careful of serialization. +Astropy types do not serialize to JSON by default, so you will need to add serialization and deserialization support using Pydantic's facilities. + +If you do this, consider adding a test case for your application that serializes your worker model to JSON, deserializes it back from JSON, and verifies that the resulting object matches the original object. + +Input parameter model +===================== + +Every UWS application must define a Pydantic model for its input parameters. +This model must inherit from `ParametersModel`. +In addition to defining the parameter model, it must provide two methods: a class method named ``from_job_parameters`` that takes as input the list of `UWSJobParameter` objects and returns an instance of the model, and an instance method named ``to_worker_parameters`` that converts the model to the one that will be passed to the backend worker (see :ref:`uws-worker-model`). + +Often, the worker parameter model will look very similar to the input parameter model. +They are still kept separate, since the input parameter model defines the API and the worker model defines the interface to the backend. +Over the lifetime of a service, those two interfaces often have to diverge, and it's cleaner to maintain that separation from the start. + +Here is an example of a simple model for a cutout service: + +.. code-block:: python + :caption: models/cutout.py + + from typing import Self + + from pydantic import Field + from safir.uws import ParameterParseError, ParametersModel, UWSJobParameter + + from .domain.cutout import Point, WorkerCircleStencil, WorkerCutout + + + class CircleStencil(WorkerCircleStencil): + @classmethod + def from_string(cls, params: str) -> Self: + ra, dec, radius = (float(p) for p in params.split()) + return cls(center=Point(ra=ra, dec=dec), radius=radius) + + + class CutoutParameters(ParametersModel[WorkerCutout]): + ids: list[str] = Field(..., title="Dataset IDs") + stencils: list[CircleStencil] = Field(..., title="Cutout stencils") + + @classmethod + def from_job_parameters(cls, params: list[UWSJobParameter]) -> Self: + ids = [] + stencils = [] + try: + for param in params: + if param.parameter_id == "id": + ids.append(param.value) + else: + stencils.append(CircleStencil.from_string(param.value)) + except Exception as e: + msg = f"Invalid cutout parameter: {type(e).__name__}: {e!s}" + raise ParameterParseError(msg, params) from e + return cls(ids=ids, stencils=stencils) + + def to_worker_parameters(self) -> WorkerCutout: + return WorkerCutout(dataset_ids=self.ids, stencils=self.stencils) + +Notice that the input parameter model reuses some models from the worker (``Point`` and ``WorkerCircleStencil``), but adds a new class method to the latter via inheritance. +It also uses a different parameter for the dataset IDs (``ids`` instead of ``dataset_ids``), which is a trivial example of the sort of divergence one might see between input models and backend worker models. + +The input models are also responsible for input parsing and validation (note the ``from_job_parameters`` and ``from_string`` methods) and converting to the worker model. +The worker model should be in a separate file and kept as simple as possible, since it has to be imported by the backend worker, which may not have the dependencies installed to be able to import other frontend code. + +Update the application configuration +==================================== + +Now that you've defined the parameters model, you can update :file:`config.py` to pass that model to `UWSAppSettings.build_uws_config`, as mentioned in :ref:`uws-config`. +Set the ``parameters_type`` argument to the class name of the parameters model. +In the example above, that would be ``CutoutParameters``. + +Next steps +========== + +- Write the backend worker :doc:`write-backend` diff --git a/docs/user-guide/uws/index.rst b/docs/user-guide/uws/index.rst new file mode 100644 index 00000000..85a33a81 --- /dev/null +++ b/docs/user-guide/uws/index.rst @@ -0,0 +1,27 @@ +######################### +Building UWS applications +######################### + +Universal Worker Service (UWS) is an `IVOA standard `__ for writing possibly-asynchronous web services for astronomy. +Safir provides a comprehensive library for writing services that use this standard. +In addition to implementing the UWS protocol, this library dispatches the underlying work to an arq_ worker, allowing that worker to be based on a Rubin Science Pipelines stack container and reuse the astronomy code written for Rubin Observatory. + +Applications built with this framework have three components: + +#. A frontend web service that takes requests that follow the UWS protocol. +#. A backend arq_ worker, possibly running on a different software stack, that does the work. +#. A database arq_ worker that handles bookkeeping and result processing for the backend worker. + +Incoming requests are turned into arq_ jobs, processed by the backend worker, uploaded to Google Cloud Storage, recorded in a database, and then returned to the user via a frontend that reads the result URLs and other metadata from the database. + +Guides +====== + +.. toctree:: + :titlesonly: + + create-a-service + define-inputs + define-models + write-backend + testing diff --git a/docs/user-guide/uws/testing.rst b/docs/user-guide/uws/testing.rst new file mode 100644 index 00000000..8c79f599 --- /dev/null +++ b/docs/user-guide/uws/testing.rst @@ -0,0 +1,204 @@ +.. currentmodule:: safir.testing.uws + +######################## +Testing UWS applications +######################## + +UWS applications are arq_ applications, and therefore should follow the testing advice in :ref:`arq-testing`. +This includes testing the frontend and the backend worker separately. + +The :mod:`safir.testing.uws` module provides some additional support to make it easier to test the frontend. + +Frontend testing fixtures +========================= + +The frontend of a UWS application assumes that arq_ will execute both jobs and the database worker that recovers the results of a job and stores them in the database. +During testing of the frontend, arq will not be running, and therefore this execution must be simulated. +This is done with the `MockUWSJobRunner` class, but it requires some setup. + +Mock the arq queue +------------------ + +First, the application must be configured to use a `~safir.arq.MockArqQueue` class instead of one based on Redis. +This stores all queued jobs in memory and provides some test-only methods to manipulate them. + +To do this, first set up a fixture in :file:`tests/conftest.py` that provides a mock arq queue: + +.. code-block:: python + :caption: tests/conftest.py + + import pytest + from safir.arq import MockArqQueue + + + @pytest.fixture + def arq_queue() -> MockArqQueue: + return MockArqQueue() + +Then, configure the application to use that arq queue instead of the default one in the ``app`` fixture. + +.. code-block:: python + :caption: tests/conftest.py + :emphasize-lines: 8,14 + + from collections.abc import AsyncIterator + + from asgi_lifespan import LifespanManager + from fastapi import FastAPI + from safir.arq import MockArqQueue + + from example import main + from example.config import uws + + + @pytest_asyncio.fixture + async def app(arq_queue: MockArqQueue) -> AsyncIterator[FastAPI]: + async with LifespanManager(main.app): + uws.override_arq_queue(arq_queue) + yield main.app + +Provide a test database +----------------------- + +UWS relies on database in which to store job information and results. +Follow the instructions in :ref:`database-testing` to use tox-docker_ to create a test PostgreSQL database, but skip the instructions there for initializing the database. +Instead, use the UWS library to initialize the resulting database: + +.. code-block:: python + :caption: tests/conftest.py + :emphasize-lines: 3,14-15 + + from collections.abc import AsyncIterator + + import structlog + from asgi_lifespan import LifespanManager + from fastapi import FastAPI + from safir.arq import MockArqQueue + + from example import main + from example.config import uws + + + @pytest_asyncio.fixture + async def app(arq_queue: MockArqQueue) -> AsyncIterator[FastAPI]: + logger = structlog.get_logger("example") + await uws.initialize_uws_database(logger, reset=True) + async with LifespanManager(main.app): + uws.override_arq_queue(arq_queue) + yield main.app + +Mock Google Cloud Storage +------------------------- + +The UWS library assumes results are in Google Cloud Storage and creates signed URLs to allow the client to retrieve those results. +This support needs to be mocked out during testing. +Do this by adding the following fixture to :file:`tests/conftest.py`: + +.. code-block:: python + :caption: tests/conftest.py + + from datetime import timedelta + + import pytest + from safir.testing.gcs import MockStorageClient, patch_google_storage + + + @pytest.fixture(autouse=True) + def mock_google_storage() -> Iterator[MockStorageClient]: + yield from patch_google_storage( + expected_expiration=timedelta(minutes=15), bucket_name="some-bucket" + ) + +See :ref:`gcs-testing` for more information. + +Provide a mock arq queue runner +------------------------------- + +Finally, you can create a fixture that provides a mock arq queue runner. +This will simulate not only the execution of a backend worker that results some result, but also the collection of that result and subsequent database updates. + +.. code-block:: python + :caption: tests/conftest.py + + from collections.abc import AsyncIterator + + import pytest_asyncio + from safir.arq import MockArqQueue + from safir.testing.uws import MockUWSJobRunner + + from example.config import config + + + @pytest_asyncio.fixture + async def runner( + arq_queue: MockArqQueue, + ) -> AsyncIterator[MockUWSJobRunner]: + async with MockUWSJobRunner(config.uws_config, arq_queue) as runner: + yield runner + +Writing a frontend test +======================= + +Now, all the pieces are in place to write a meaningful test of the frontend. +You can use the methods of `MockUWSJobRunner` to change the state of a mocked backend job and set the results that it returned. +Here is an example of a test of a hypothetical cutout service. + +.. code-block:: python + :caption: tests/handlers/async_test.py + + import pytest + from httpx import AsyncClient + from safir.testing.uws import MockUWSJobRunner + + + @pytest.mark.asyncio + async def test_create_job( + client: AsyncClient, runner: MockUWSJobRunner + ) -> None: + r = await client.post( + "/api/cutout/jobs", + headers={"X-Auth-Request-User": "someone"}, + data={"ID": "1:2:band:value", "Pos": "CIRCLE 0 1 2"}, + ) + assert r.status_code == 303 + assert r.headers["Location"] == "https://example.com/api/cutout/jobs/1" + await runner.mark_in_progress("someone", "1") + + async def run_job() -> None: + results = [ + UWSJobResult( + result_id="cutout", + url="s3://some-bucket/some/path", + mime_type="application/fits", + ) + ] + await runner.mark_complete("someone", "1", results, delay=0.2) + + _, r = await asyncio.gather( + run_job(), + client.get( + "/api/cutout/jobs/1", + headers={"X-Auth-Request-User": "someone"}, + params={"wait": 2, "phase": "EXECUTING"}, + ), + ) + assert r.status_code == 200 + assert "https://example.com/some/path" in r.text + +Note the use of `MockUWSJobRunner.mark_complete` with a ``delay`` argument and `asyncio.gather` to simulate a job that takes some time to complete so that the client request to wait for job completion can be tested. + +A more sophisticated test would check the XML results returned by the API against the UWS XML schema. +This can be done using the models provided by vo-models_. + +Testing the backend worker +========================== + +The backend divides naturally into two pieces: the wrapper code that accepts the arguments in the format passed by the UWS library, handles exceptions, and constructs `~safir.arq.uws.WorkerResult` objects; and the code that performs the underlying operation of the service. + +To make testing easier, it's usually a good idea to separate those two pieces. +The wrapper that handles talking to the UWS library and translating exceptions can be included in the source of the application. +The underlying code to perform the operation is often best maintained in a library of domain-specific code. +For example, for Rubin Observatory, this will usually be a function in a Science Pipelines package with its own separate tests. + +If the code is structured this way, there won't be much to test in the backend worker wrapper and often one can get away with integration tests. +If more robust tests are desired, though, the backend worker function is a simple function that can be called and tested directly by the test suite, possibly after mocking out the underlying library function. diff --git a/docs/user-guide/uws/write-backend.rst b/docs/user-guide/uws/write-backend.rst new file mode 100644 index 00000000..6f96637f --- /dev/null +++ b/docs/user-guide/uws/write-backend.rst @@ -0,0 +1,175 @@ +.. currentmodule:: safir.arq.uws + +######################## +Write the backend worker +######################## + +The backend worker is the heart of the application. +This is where you do the real work of the service. + +A backend worker is a sync Python function that takes three arguments as input: + +#. The worker parameters model (see :ref:`uws-worker-model`). +#. Metadata about the UWS job, including authentication credentials from the user's request that can be used to make requests of other services. +#. A structlog_ `~structlog.stdlib.BoundLogger` to use for logging. + +The backend worker must return a list of `WorkerResult` objects. +Each contains a name for the result, the ``s3`` or ``gs`` URL to where the result is stored in a Google Cloud Storage bucket, and the MIME type and (optionally) the size of the result. + +All results must be stored in Google Cloud Storage currently. +No other backend store for results is supported. +The frontend for your application will use signed GCS URLs to allow the client to retrieve the results. + +Structure of the worker +======================= + +The backend worker should be defined in a single file in the :file:`workers` directory of your application source. +This file may include other modules in your application source. +It will, for example, generally include the module that defines the worker parameter model created in :ref:`uws-worker-model`. + +However, be aware that the worker will run in a different Python environment than the frontend (usually a Rubin pipelines container). +It therefore must not include any portion of the application source that requires additional dependencies such as FastAPI, and it must not include general Safir modules. +Normally it should only include the worker parameter model, :mod:`safir.arq`, :mod:`safir.arq.uws`, and any other Python modules that are available in the backend Python environment. + +The backend worker should also target the version of Python provided by its base container, which may be different (usually older) than the version of Python used by the frontend. + +Here is the rough shape of the module that defines this worker: + +.. code-block:: python + :caption: workers/example.py + + import os + + import structlog + from safir.arq.uws import ( + WorkerConfig, + WorkerJobInfo, + WorkerResult, + build_worker, + ) + + + def example( + params: WorkerCutout, info: WorkerJobInfo, logger: BoundLogger + ) -> list[WorkerResult]: ... + + + WorkerSettings = build_worker( + example, + WorkerConfig( + arq_mode=ArqMode.production, + arq_queue_url=os.environ["EXAMPLE_ARQ_QUEUE_URL"], + arq_queue_password=os.getenv("EXAMPLE_ARQ_QUEUE_PASSWORD"), + grace_period=timedelta( + seconds=int(os.environ["EXAMPLE_GRACE_PERIOD"]) + ), + parameters_class=WorkerCutout, + timeout=timedelta(seconds=int(os.environ["EXAMPLE_TIMEOUT"])), + ), + structlog.get_logger("example"), + ) + +In this case, the worker function is ``example``. +It must return a list of `WorkerResult` objects. +For sync requests, if supported, the first element of that list is the one that will be returned as the result of the request. + +The call to `build_worker` creates the necessary arq_ configuration that allows this module to be used as the module defining an arq worker. +Notice that some configuration information has to be duplicated from the main application configuration, but cannot reuse the same model because pydantic-settings may not be available in the worker. +The corresponding environment variables are therefore used directly. + +The ``parameters_class`` argument must be the model defined in :ref:`uws-worker-model`. + +Reporting errors +================ + +The :mod:`safir.arq.uws` module provides exceptions that should be used to wrap all errors encountered during backend processing. +These exceptions ensure that the backtrace of the error is serialized properly and included in the job results so that it can be reported to the user via the frontend. +They are `WorkerFatalError`, `WorkerTransientError`, `WorkerTimeoutError`, and `WorkerUsageError`. + +Their meanings are somewhat obvious. +A transient error is one that may resolve if the user simply submits the job again. +`WorkerUsageError` should be used in cases where the job parameters are invalid in a way that couldn't be detected in the frontend. + +Except for `WorkerTimeoutError`, all of these exceptions take two arguments and a flag. +The first argument is the normal exception message. +The second, optional argument can be used to provide additional details about the failure. + +The flag, ``add_traceback``, should be set to `True` if the traceback of the underlying exception should be reported to the user. +The default is `False`. +Do not set this to `True` if the traceback may contain secrets or other private data that shouldn't be exposed to the user. +If set to true, make sure that the exception is raised with a ``from`` clause that references the underlying exception. +It is the traceback of that exception that will be reported to the user. + +`WorkerTimeoutError` takes two arguments: the total elapsed time and the timeout that was exceeded. +It is normally handled automatically by the wrapper added by `build_worker`, but it can be thrown directly by the backend code if it detects a timeout. +The relevant timeout should be the ``timeout`` member of the `WorkerJobInfo` object described below. + +Here is a simple example that calls a ``do_work`` function and translates all exceptions into `WorkerFatalError` with tracebacks reported to the user: + +.. code-block:: python + + from safir.arq.uws import WorkerFatalError, WorkerJobInfo, WorkerResult + from structlog.stdlib import BoundLogger + + from ..models.domain.cutout import WorkerCutout + + + def example( + params: WorkerCutout, info: WorkerJobInfo, logger: BoundLogger + ) -> list[WorkerResult]: + try: + result_url = do_work() + except Exception as e: + raise WorkerFatalError(f"{type(e).__name__}: {e!s}") from e + return [WorkerResult(result_id="example", url=result_url)] + +Job metadata +============ + +The second argument to the worker function is a `WorkerJobInfo` object. +It includes things like the user who submitted the job, the UWS job ID and (if given) run ID, and so forth. +See its documentation for a full list. +There are two attributes that deserve special mention, however. + +The ``token`` attribute contains a delegated Gafaelfawr_ token to act on behalf of the user. +This token must be included in an :samp:`Authorization: bearer ` header in any web request that the backend makes to other Rubin Science Platform services. + +The ``timeout`` attribute contains a `~datetime.timedelta` representation of the timeout for the job. +The backend ideally should arrange to not exceed that total wall clock interval when executing. +If it does take longer, it will be killed. +See :ref:`uws-aborted-jobs`. + +.. _uws-aborted-jobs: + +Aborting jobs +============= + +The arq_ queuing system normally assumes that all job handlers are async. +The UWS library is designed to instead support sync backend worker functions, since the Rubin Observatory scientific code is generally written in sync Python. +This unfortunately means that the normal asyncio mechanisms for handling timeouts and canceling jobs do not work with these backend workers. + +The Safir UWS library works around this with a somewhat ugly hack. +The backend job is run in a separate process, using `concurrent.futures.ProcessPoolExecutor` with a single pool process. +If the job times out or is aborted by arq_, the process running the backend code is killed with SIGINT and then the process pool is cleaned up and recreated. + +This should mostly be transparent to you when writing backend worker functions. +The only caveat to be aware of is that your function may receive a SIGINT if it has to be aborted, which Python by default will translate into a `KeyboardInterrupt` exception. +Any temporary resources which need to be cleaned up when the job is aborted should be handled with context managers or ``finally`` blocks. + +If you replace the default SIGINT handler, you will need to take your own steps to ensure that the backend worker exits quickly and cleans up properly on receipt of a SIGINT signal. +Otherwise, you may hang the arq backend worker and prevent the service from working correctly. + +Register the worker +=================== + +Now that you have written the worker, add the name of the worker function to the `~safir.uws.UWSAppSettings.build_uws_config` call in your ``Config`` class, as mentioned in :ref:`uws-config`. +It should be passed as the value of the ``worker`` argument. + +This argument should be the name of the backend worker function as a string. +In the above example, it would be ``"worker"``. +It is *not* the function itself, just the name of the function as a Python string. + +Next steps +========== + +- Write a test suite :doc:`testing` diff --git a/safir/src/safir/uws/_config.py b/safir/src/safir/uws/_config.py index 9c2408fd..c6b3a34f 100644 --- a/safir/src/safir/uws/_config.py +++ b/safir/src/safir/uws/_config.py @@ -310,7 +310,7 @@ def build_uws_config( Returns ------- - uws_config + UWSConfig UWS configuration. Parameters