From 3bbaeef8a7d4f02857f7cbe9f516bc29af034514 Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Tue, 3 Dec 2024 17:10:06 -0800 Subject: [PATCH] Switch to vo-models for nearly all UWS responses Finish the conversion to using vo-models to generate nearly all UWS responses. Templating remains only to generate the error VOTable document. Due to limitations in pydantic-xml plus the complexity of the type system, clients will have to provide an additional argument to the UWS configuration specifying the qualified type of the `vo_models.uws.JobSummary` class used to represent jobs. Clients will also have to implement a new `to_xml_model` method on their `ParametersModel` class. --- changelog.d/20241203_171205_rra_DM_47790.md | 11 ++ docs/user-guide/uws/create-a-service.rst | 1 + docs/user-guide/uws/define-models.rst | 72 ++++++++-- safir/src/safir/uws/__init__.py | 3 +- safir/src/safir/uws/_config.py | 72 ++++------ safir/src/safir/uws/_handlers.py | 42 +++--- safir/src/safir/uws/_models.py | 114 ++++++++++------ safir/src/safir/uws/_responses.py | 21 +-- safir/src/safir/uws/_service.py | 112 +++++++++++++-- safir/src/safir/uws/templates/error.xml | 2 +- safir/src/safir/uws/templates/job.xml | 40 ------ safir/src/safir/uws/templates/parameters.xml | 10 -- safir/tests/support/uws.py | 33 ++++- safir/tests/uws/job_api_test.py | 136 +++++++++++-------- safir/tests/uws/job_error_test.py | 65 +++++---- safir/tests/uws/long_polling_test.py | 48 ++++--- safir/tests/uws/workers_test.py | 7 +- 17 files changed, 493 insertions(+), 296 deletions(-) create mode 100644 changelog.d/20241203_171205_rra_DM_47790.md delete mode 100644 safir/src/safir/uws/templates/job.xml delete mode 100644 safir/src/safir/uws/templates/parameters.xml diff --git a/changelog.d/20241203_171205_rra_DM_47790.md b/changelog.d/20241203_171205_rra_DM_47790.md new file mode 100644 index 00000000..db2eb2b8 --- /dev/null +++ b/changelog.d/20241203_171205_rra_DM_47790.md @@ -0,0 +1,11 @@ +### Backwards-incompatible changes + +- UWS clients must now pass an additional `job_summary_type` argument to `UWSAppSettings.build_uws_config` and implement `to_xml_model` in their implementation of `ParametersModel`, returning a subclass of the vo-models `Parameters` class. + +### Bug fixes + +- Append a colon after the error code when reporting UWS errors. + +### Other changes + +- Render all UWS XML output except for error VOTables using vo-models rather than hand-written XML templates. diff --git a/docs/user-guide/uws/create-a-service.rst b/docs/user-guide/uws/create-a-service.rst index b63768f8..aac3f3b7 100644 --- a/docs/user-guide/uws/create-a-service.rst +++ b/docs/user-guide/uws/create-a-service.rst @@ -57,6 +57,7 @@ For now, you can just insert placeholder values. def uws_config(self) -> UWSConfig: return self.build_uws_config( async_post_route=UWSRoute(...), + job_summary_type=..., parameters_type=..., sync_get_route=UWSRoute(...), sync_post_route=UWSRoute(...), diff --git a/docs/user-guide/uws/define-models.rst b/docs/user-guide/uws/define-models.rst index d77b3bbb..2de84276 100644 --- a/docs/user-guide/uws/define-models.rst +++ b/docs/user-guide/uws/define-models.rst @@ -4,13 +4,26 @@ Define job parameter models ########################### -UWS models all parameters as simple lists of key/value pairs with string values. -However, for internal purposes, most applications will want a more sophisticated parameter model than that, with better input validation. -The frontend should parse and validate the input parameters so that it can fail quickly if they are invalid, rather than creating a job, dispatching it, and only then having it fail due to invalid parameters. - -UWS applications therefore must define two models for input parameters, both Pydantic models. -The first is the model of parameters as provided by users, and is used to validate the input parameters. -The second is the model that will be passed to the backend worker. +A UWS job is defined by its input parameters. +Unfortunately, due to issues with the IVOA UWS standard and the need for separation between the API and backend processing, the input parameters for a job have to be defined in five different ways. + +#. A Pydantic API model representing the validated input parameters for a job. + This is the canonical input form and corresponds to a native JSON API. +#. The parameters sent to the backend worker. + Often, this may be the same as the API model, but best practice is to define two separate models. + This allows the two models to change independently, permitting changes to the backend without changing the API or changes to the API without changing the backend code. +#. An XML representation of the input parameters. + This is essentially a list of key/value pairs wrapped in a child class of `~vo_models.uws.models.Parameters` and is used for XML serialization and deserialization for the IVOA UWS protocol. + This separate model is required because the IVOA UWS standard requires a very simplistic XML serialization of job parameters that flattens any complex structure into strings, and thus is not suitable for use as the general API model for many applications. +#. The input parameters for job creation via ``POST``, since the IVOA UWS standard requires support for job creation via form ``POST``. +#. The input parameters for job creation via ``GET``, used for sync jobs. + Supporting this is optional. + +In some cases (jobs whose parameters are all simple strings or numbers), the same model can be used for 1 and 4 by specifying it as a form parameter model. +Unfortunately, the same model cannot be used for 1 and 3 even for simple applications because the XML model requires additional structure that obscures the parameters and should not be included in the JSON API model. + +Therefore, in the most general case, UWS applications must define three models for input parameters: the API model of parameters as provided by users via a JSON API, the model passed to the backend worker, and an XML model that flattens all parameters to strings. +The input parameters for job creation via ``POST`` and ``GET`` are discussed in :doc:`define-inputs`. .. _uws-worker-model: @@ -58,12 +71,37 @@ Astropy types do not serialize to JSON by default, so you will need to add seria If you do this, consider adding a test case for your application that serializes your worker model to JSON, deserializes it back from JSON, and verifies that the resulting object matches the original object. +.. _uws-xml-model: + +XML parameter model +=================== + +The XML parameter model must be a subclass of `~vo_models.uws.models.Parameters`. +Each parameter must be either a `~vo_models.uws.models.Parameter` or a ``MultiValuedParameter`` (for the case where the parameter can be specified more than once for simple list support). + +This effectively requires serialization of all parameter values to strings, since the value attribute of a `~vo_models.uws.models.Parameter` only accepts simple strings to follow the IVOA UWS standard. + +Here is a simple example for the same cutout service: + +.. code-block:: python + + from vo_models.uws import MultiValuedParameter, Parameter, Parameters + + + class CutoutXmlParameters(Parameters): + id: MultiValuedParameter = Parameter(id="id") + circle: MultiValuedParameter = Parameter(id="circle") + +This class should not do any input validation other than validation of the permitted parameter IDs. +Input validation will be done by the input parameter model. + Input parameter model ===================== Every UWS application must define a Pydantic model for its input parameters. This model must inherit from `ParametersModel`. -In addition to defining the parameter model, it must provide two methods: a class method named ``from_job_parameters`` that takes as input the list of `UWSJobParameter` objects and returns an instance of the model, and an instance method named ``to_worker_parameters`` that converts the model to the one that will be passed to the backend worker (see :ref:`uws-worker-model`). + +In addition to defining the parameter model, it must provide three methods: a class method named ``from_job_parameters`` that takes as input the list of `UWSJobParameter` objects and returns an instance of the model, an instance method named ``to_worker_parameters`` that converts the model to the one that will be passed to the backend worker (see :ref:`uws-worker-model`), and an instance method named ``to_xml_model`` that converts the model to the XML model (see :ref:`uws-xml-model`). Often, the worker parameter model will look very similar to the input parameter model. They are still kept separate, since the input parameter model defines the API and the worker model defines the interface to the backend. @@ -78,6 +116,7 @@ Here is an example of a simple model for a cutout service: from pydantic import Field from safir.uws import ParameterParseError, ParametersModel, UWSJobParameter + from vo_models.uws import Parameter from .domain.cutout import Point, WorkerCircleStencil, WorkerCutout @@ -88,6 +127,9 @@ Here is an example of a simple model for a cutout service: ra, dec, radius = (float(p) for p in params.split()) return cls(center=Point(ra=ra, dec=dec), radius=radius) + def to_string(self) -> str: + return f"{c.center.ra!s} {c.center.dec!s} {c.radius!s}" + class CutoutParameters(ParametersModel[WorkerCutout]): ids: list[str] = Field(..., title="Dataset IDs") @@ -111,19 +153,33 @@ Here is an example of a simple model for a cutout service: def to_worker_parameters(self) -> WorkerCutout: return WorkerCutout(dataset_ids=self.ids, stencils=self.stencils) + def to_xml_model(self) -> CutoutXmlParameters: + ids = [Parameter(id="id", value=i) for i in self.ids] + circles = [] + for circle in self.stencils: + circles.append(Parameter(id="circle", value=circle.to_string())) + return CutoutXmlParameters(id=ids, circle=circles) + Notice that the input parameter model reuses some models from the worker (``Point`` and ``WorkerCircleStencil``), but adds a new class method to the latter via inheritance. It also uses a different parameter for the dataset IDs (``ids`` instead of ``dataset_ids``), which is a trivial example of the sort of divergence one might see between input models and backend worker models. +``CutoutXmlParameters`` is defined in :ref:`uws-xml-model`. The input models are also responsible for input parsing and validation (note the ``from_job_parameters`` and ``from_string`` methods) and converting to the worker model. The worker model should be in a separate file and kept as simple as possible, since it has to be imported by the backend worker, which may not have the dependencies installed to be able to import other frontend code. +The XML model must use simple key/value pairs of strings to satisfy the UWS XML API, so ``to_xml_model`` may need to do some conversion from the model back to a string representation of the parameters. + Update the application configuration ==================================== Now that you've defined the parameters model, you can update :file:`config.py` to pass that model to `UWSAppSettings.build_uws_config`, as mentioned in :ref:`uws-config`. + Set the ``parameters_type`` argument to the class name of the parameters model. In the example above, that would be ``CutoutParameters``. +Set the ``job_summary_type`` argument to ``JobSummary[XmlModel]`` where ``XmlModel`` is whatever the class name of your XML parameter model is. +In the example above, that would be ``JobSummary[CutoutXmlParameters]``. + Next steps ========== diff --git a/safir/src/safir/uws/__init__.py b/safir/src/safir/uws/__init__.py index 309c342e..f2bd0ce7 100644 --- a/safir/src/safir/uws/__init__.py +++ b/safir/src/safir/uws/__init__.py @@ -1,7 +1,7 @@ """Support library for writing UWS-enabled services.""" from ._app import UWSApplication -from ._config import ParametersModel, UWSAppSettings, UWSConfig, UWSRoute +from ._config import UWSAppSettings, UWSConfig, UWSRoute from ._exceptions import ( DatabaseSchemaError, MultiValuedParameterError, @@ -12,6 +12,7 @@ ) from ._models import ( ErrorCode, + ParametersModel, UWSJob, UWSJobError, UWSJobParameter, diff --git a/safir/src/safir/uws/_config.py b/safir/src/safir/uws/_config.py index c6b3a34f..7c66494b 100644 --- a/safir/src/safir/uws/_config.py +++ b/safir/src/safir/uws/_config.py @@ -2,16 +2,16 @@ from __future__ import annotations -from abc import ABC, abstractmethod from collections.abc import Callable, Coroutine from dataclasses import dataclass from datetime import datetime, timedelta -from typing import Generic, Self, TypeAlias, TypeVar +from typing import TypeAlias from arq.connections import RedisSettings -from pydantic import BaseModel, Field, SecretStr +from pydantic import Field, SecretStr from pydantic_core import Url from pydantic_settings import BaseSettings +from vo_models.uws import JobSummary from safir.arq import ArqMode, build_arq_redis_settings from safir.pydantic import ( @@ -21,7 +21,7 @@ SecondsTimedelta, ) -from ._models import UWSJob, UWSJobParameter +from ._models import ParametersModel, UWSJob, UWSJobParameter DestructionValidator: TypeAlias = Callable[[datetime, UWSJob], datetime] """Type for a validator for a new destruction time.""" @@ -31,12 +31,7 @@ ] """Type for a validator for a new execution duration.""" -T = TypeVar("T", bound=BaseModel) -"""Generic type for the worker parameters.""" - __all__ = [ - "ParametersModel", - "T", "UWSAppSettings", "UWSConfig", "UWSRoute", @@ -57,39 +52,6 @@ class UWSRoute: """Description string for API documentation.""" -class ParametersModel(BaseModel, ABC, Generic[T]): - """Defines the interface for a model suitable for job parameters.""" - - @classmethod - @abstractmethod - def from_job_parameters(cls, params: list[UWSJobParameter]) -> Self: - """Validate generic UWS parameters and convert to the internal model. - - Parameters - ---------- - params - Generic input job parameters. - - Returns - ------- - ParametersModel - Parsed cutout parameters specific to service. - - Raises - ------ - safir.uws.MultiValuedParameterError - Raised if multiple parameters are provided but not supported. - safir.uws.ParameterError - Raised if one of the parameters could not be parsed. - pydantic.ValidationError - Raised if the parameters do not validate. - """ - - @abstractmethod - def to_worker_parameters(self) -> T: - """Convert to the domain model used by the backend worker.""" - - @dataclass class UWSConfig: """Configuration for the UWS service. @@ -123,6 +85,15 @@ class encapsulates the configuration of the UWS component that may vary by aborted. """ + job_summary_type: type[JobSummary] + """Type representing the parameter-qualified job summary type. + + Must be set to `~vo_models.uws.JobSummary` qualified with the appropriate + subclass of `~vo_models.uws.Parameters`. This is necessary to work around + limitations in pydantic-xml, which require the types to be known at class + instantiation time. + """ + lifetime: timedelta """The lifetime of jobs. @@ -291,6 +262,7 @@ def build_uws_config( self, *, async_post_route: UWSRoute, + job_summary_type: type[JobSummary], parameters_type: type[ParametersModel], slack_webhook: SecretStr | None = None, sync_get_route: UWSRoute | None = None, @@ -308,11 +280,6 @@ def build_uws_config( configuration. Its parameters are the additional settings accepted by the UWS library that are not part of the ``UWSAppSettings`` model. - Returns - ------- - UWSConfig - UWS configuration. - Parameters ---------- async_post_route @@ -320,6 +287,11 @@ def build_uws_config( POST. The FastAPI dependency included in this object should expect POST parameters and return a list of `~safir.uws.UWSJobParameter` objects representing the job parameters. + job_summary_type + Type representing the XML job summary type, qualified with an + appropriate subclass of `~vo_models.uws.models.Parameters`. That + subclass should be the same as that returned by the + ``to_xml_model`` method of ``parameters_type``. parameters_type Type representing the job parameters. This will be used to validate parameters and to parse them before passing them to the @@ -355,6 +327,11 @@ def build_uws_config( worker Name of the backend worker to call to execute a job. + Returns + ------- + UWSConfig + UWS configuration. + Examples -------- Normally, this method is used from a property method that returns the @@ -386,6 +363,7 @@ def uws_config(self) -> UWSConfig: arq_mode=self.arq_mode, arq_redis_settings=self.arq_redis_settings, execution_duration=self.timeout, + job_summary_type=job_summary_type, lifetime=self.lifetime, parameters_type=parameters_type, signing_service_account=self.service_account, diff --git a/safir/src/safir/uws/_handlers.py b/safir/src/safir/uws/_handlers.py index a079c296..3af527bc 100644 --- a/safir/src/safir/uws/_handlers.py +++ b/safir/src/safir/uws/_handlers.py @@ -12,7 +12,6 @@ from fastapi import APIRouter, Depends, Form, Query, Request, Response from fastapi.responses import PlainTextResponse, RedirectResponse from structlog.stdlib import BoundLogger -from vo_models.uws import Jobs, Results from vo_models.uws.types import ExecutionPhase from safir.datetime import isodatetime @@ -84,11 +83,13 @@ async def get_job_list( ) -> Response: job_service = uws_factory.create_job_service() jobs = await job_service.list_jobs( - user, phases=phase, after=after, count=last + user, + base_url=str(request.url_for("get_job_list")), + phases=phase, + after=after, + count=last, ) - base_url = request.url_for("get_job_list") - xml_jobs = Jobs(jobref=[j.to_xml_model(str(base_url)) for j in jobs]) - xml = xml_jobs.to_xml(skip_empty=True) + xml = jobs.to_xml(skip_empty=True) return Response(content=xml, media_type="application/xml") @@ -127,11 +128,12 @@ async def get_job( uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], ) -> Response: job_service = uws_factory.create_job_service() - job = await job_service.get( - user, job_id, wait_seconds=wait, wait_phase=phase + result_store = uws_factory.create_result_store() + job = await job_service.get_summary( + user, job_id, signer=result_store, wait_seconds=wait, wait_phase=phase ) - templates = uws_factory.create_templates() - return await templates.job(request, job) + xml = job.to_xml(skip_empty=True) + return Response(content=xml, media_type="application/xml") @uws_router.delete( @@ -238,11 +240,11 @@ async def get_job_error( uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], ) -> Response: job_service = uws_factory.create_job_service() - job = await job_service.get(user, job_id) - if not job.error: + error = await job_service.get_error(user, job_id) + if not error: raise DataMissingError(f"Job {job_id} did not fail") templates = uws_factory.create_templates() - return templates.error(request, job.error) + return templates.error(request, error) @uws_router.get( @@ -318,9 +320,11 @@ async def get_job_parameters( uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], ) -> Response: job_service = uws_factory.create_job_service() - job = await job_service.get(user, job_id) - templates = uws_factory.create_templates() - return templates.parameters(request, job) + job = await job_service.get_summary(user, job_id) + if not job.parameters: + raise DataMissingError(f"Job {job_id} has no parameters") + xml = job.parameters.to_xml(skip_empty=True) + return Response(content=xml, media_type="application/xml") @uws_router.get( @@ -403,11 +407,11 @@ async def get_job_results( uws_factory: Annotated[UWSFactory, Depends(uws_dependency)], ) -> Response: job_service = uws_factory.create_job_service() - job = await job_service.get(user, job_id) result_store = uws_factory.create_result_store() - results = [result_store.sign_url(r) for r in job.results] - xml_model = Results(results=[r.to_xml_model() for r in results]) - xml = xml_model.to_xml(skip_empty=True) + job = await job_service.get_summary(user, job_id, signer=result_store) + if not job.results: + raise DataMissingError(f"Job {job_id} has no results") + xml = job.results.to_xml(skip_empty=True) return Response(content=xml, media_type="application/xml") diff --git a/safir/src/safir/uws/_models.py b/safir/src/safir/uws/_models.py index 16480ef3..c917ab04 100644 --- a/safir/src/safir/uws/_models.py +++ b/safir/src/safir/uws/_models.py @@ -7,10 +7,13 @@ from __future__ import annotations +from abc import ABC, abstractmethod from dataclasses import asdict, dataclass from datetime import datetime, timedelta from enum import StrEnum +from typing import Generic, Self, TypeVar +from pydantic import BaseModel from vo_models.uws import ( ErrorSummary, JobSummary, @@ -22,9 +25,22 @@ ) from vo_models.uws.types import ErrorType, ExecutionPhase, UWSVersion +P = TypeVar("P", bound="ParametersModel") +"""Generic type for the parameters model.""" + +S = TypeVar("S", bound=JobSummary) +"""Generic type for the XML job summary for a given parameters model.""" + +W = TypeVar("W", bound=BaseModel) +"""Generic type for the parameters model passed to workers.""" + +X = TypeVar("X", bound=Parameters) +"""Generic type for the XML parameters model.""" + __all__ = [ "ACTIVE_PHASES", "ErrorCode", + "ParametersModel", "UWSJob", "UWSJobDescription", "UWSJobError", @@ -57,6 +73,43 @@ class ErrorCode(StrEnum): USAGE_ERROR = "UsageError" +class ParametersModel(BaseModel, ABC, Generic[W, X]): + """Defines the interface for a model suitable for job parameters.""" + + @classmethod + @abstractmethod + def from_job_parameters(cls, params: list[UWSJobParameter]) -> Self: + """Validate generic UWS parameters and convert to the internal model. + + Parameters + ---------- + params + Generic input job parameters. + + Returns + ------- + ParametersModel + Parsed cutout parameters specific to service. + + Raises + ------ + safir.uws.MultiValuedParameterError + Raised if multiple parameters are provided but not supported. + safir.uws.ParameterError + Raised if one of the parameters could not be parsed. + pydantic.ValidationError + Raised if the parameters do not validate. + """ + + @abstractmethod + def to_worker_parameters(self) -> W: + """Convert to the domain model used by the backend worker.""" + + @abstractmethod + def to_xml_model(self) -> X: + """Convert to the XML model used in XML API responses.""" + + @dataclass class UWSJobError: """Failure information about a job.""" @@ -114,25 +167,13 @@ def to_xml_model(self) -> ResultReference: @dataclass -class UWSJobResultSigned: +class UWSJobResultSigned(UWSJobResult): """A single result from the job with a signed URL. A `UWSJobResult` is converted to a `UWSJobResultSigned` before generating the response via templating or returning the URL as a redirect. """ - result_id: str - """Identifier for the result.""" - - url: str - """Signed URL to retrieve the result.""" - - size: int | None = None - """Size of the result in bytes.""" - - mime_type: str | None = None - """MIME type of the result.""" - def to_xml_model(self) -> ResultReference: """Convert to a Pydantic XML model.""" return ResultReference( @@ -214,27 +255,6 @@ def to_xml_model(self, base_url: str) -> ShortJobDescription: ) -class GenericParameters(Parameters): - """Generic container for UWS job parameters. - - Notes - ----- - The intended use of `vo_models.uws.Parameters` is to define a subclass - with the specific parameters valid for that service. However, we store - parameters as sent to the service as a generic key/value pair in the - database and define a model that supports arbitrary parsing and - transformations, so this XML model is both not useful and not clearly - relevant. - - At least for now, define a generic subclass of `~vo_models.uws.Parameters` - that holds a generic list of parameters and convert to that for the - purposes of serialization. - """ - - params: list[Parameter] - """Job parameters.""" - - @dataclass class UWSJob: """Represents a single UWS job.""" @@ -304,12 +324,28 @@ class UWSJob: results: list[UWSJobResult] """The results of the job.""" - def to_xml_model(self) -> JobSummary: - """Convert to a Pydantic XML model.""" + def to_xml_model( + self, parameters_type: type[P], job_summary_type: type[S] + ) -> S: + """Convert to a Pydantic XML model. + + Parameters + ---------- + parameters_type + Model class used for the job parameters. + job_summary_type + XML model class for the job summary. + + Returns + ------- + vo_models.uws.models.JobSummary + XML model corresponding to this job. + """ results = None if self.results: results = Results(results=[r.to_xml_model() for r in self.results]) - return JobSummary( + parameters = parameters_type.from_job_parameters(self.parameters) + return job_summary_type( job_id=self.job_id, run_id=self.run_id, owner_id=self.owner, @@ -320,9 +356,7 @@ def to_xml_model(self) -> JobSummary: end_time=self.end_time, execution_duration=int(self.execution_duration.total_seconds()), destruction=self.destruction_time, - parameters=GenericParameters( - params=[p.to_xml_model() for p in self.parameters] - ), + parameters=parameters.to_xml_model(), results=results, error_summary=self.error.to_xml_model() if self.error else None, version=UWSVersion.V1_1, diff --git a/safir/src/safir/uws/_responses.py b/safir/src/safir/uws/_responses.py index 69342ebf..afc00427 100644 --- a/safir/src/safir/uws/_responses.py +++ b/safir/src/safir/uws/_responses.py @@ -8,7 +8,7 @@ from safir.datetime import isodatetime -from ._models import UWSJob, UWSJobError +from ._models import UWSJobError from ._results import ResultStore __all__ = ["UWSTemplates"] @@ -39,22 +39,3 @@ def error(self, request: Request, error: UWSJobError) -> Response: {"error": error}, media_type="application/xml", ) - - async def job(self, request: Request, job: UWSJob) -> Response: - """Return a job as an XML response.""" - results = [self._result_store.sign_url(r) for r in job.results] - return _templates.TemplateResponse( - request, - "job.xml", - {"job": job, "results": results}, - media_type="application/xml", - ) - - def parameters(self, request: Request, job: UWSJob) -> Response: - """Return the parameters for a job as an XML response.""" - return _templates.TemplateResponse( - request, - "parameters.xml", - {"job": job}, - media_type="application/xml", - ) diff --git a/safir/src/safir/uws/_service.py b/safir/src/safir/uws/_service.py index 95ea3e15..83e54080 100644 --- a/safir/src/safir/uws/_service.py +++ b/safir/src/safir/uws/_service.py @@ -7,6 +7,7 @@ from datetime import datetime, timedelta from structlog.stdlib import BoundLogger +from vo_models.uws import Jobs, JobSummary from vo_models.uws.types import ExecutionPhase from vo_models.vosi.availability import Availability @@ -14,7 +15,7 @@ from safir.arq.uws import WorkerJobInfo from safir.datetime import isodatetime -from ._config import ParametersModel, UWSConfig +from ._config import UWSConfig from ._constants import JOB_STOP_TIMEOUT from ._exceptions import ( InvalidPhaseError, @@ -25,11 +26,13 @@ ) from ._models import ( ACTIVE_PHASES, + ParametersModel, UWSJob, - UWSJobDescription, + UWSJobError, UWSJobParameter, UWSJobResult, ) +from ._results import ResultStore from ._storage import JobStore __all__ = ["JobService"] @@ -138,8 +141,8 @@ async def create( Returns ------- - safir.uws._models.Job - The internal representation of the newly-created job. + JobSummary + Information about the newly-created job. """ params_model = self._validate_parameters(params) job = await self._storage.add( @@ -236,7 +239,7 @@ async def get( Returns ------- UWSJob - The corresponding job. + Corresponding job. Raises ------ @@ -271,20 +274,110 @@ async def get( return job + async def get_error(self, user: str, job_id: str) -> UWSJobError | None: + """Get the error for a job, if any. + + Parameters + ---------- + user + User on behalf this operation is performed. + job_id + Identifier of the job. + + Returns + ------- + UWSJobError or None + Error information for the job, or `None` if the job didn't fail. + + Raises + ------ + PermissionDeniedError + If the job ID doesn't exist or is for a user other than the + provided user. + """ + job = await self._storage.get(job_id) + if job.owner != user: + raise PermissionDeniedError(f"Access to job {job_id} denied") + return job.error + + async def get_summary( + self, + user: str, + job_id: str, + *, + signer: ResultStore | None = None, + wait_seconds: int | None = None, + wait_phase: ExecutionPhase | None = None, + ) -> JobSummary: + """Retrieve a job's XML model. + + This also supports long-polling, to implement UWS 1.1 blocking + behavior, and signing of results. + + Parameters + ---------- + user + User on behalf this operation is performed. + job_id + Identifier of the job. + signer + If provided, generate signed URLs for the results by using the + given signing object. If no object is supplied, no URLs will be + included in the result list. + wait_seconds + If given, wait up to this many seconds for the status to change + before returning. -1 indicates waiting the maximum length of + time. This is done by polling the database with exponential + backoff. This will only be honored if the phase is ``PENDING``, + ``QUEUED``, or ``EXECUTING``. + wait_phase + If ``wait`` was given, the starting phase for waiting. Returns + immediately if the initial phase doesn't match this one. + + Returns + ------- + JobSummary + Corresponding job. + + Raises + ------ + PermissionDeniedError + If the job ID doesn't exist or is for a user other than the + provided user. + + Notes + ----- + ``wait`` and related parameters are relatively inefficient since they + poll the database using exponential backoff (starting at a 0.1s delay + and increasing by 1.5x). This may need to be reconsidered if it + becomes a performance bottleneck. + """ + job = await self.get( + user, job_id, wait_seconds=wait_seconds, wait_phase=wait_phase + ) + if signer: + job.results = [signer.sign_url(r) for r in job.results] + return job.to_xml_model( + self._config.parameters_type, self._config.job_summary_type + ) + async def list_jobs( self, user: str, + base_url: str, *, phases: list[ExecutionPhase] | None = None, after: datetime | None = None, count: int | None = None, - ) -> list[UWSJobDescription]: + ) -> Jobs: """List the jobs for a particular user. Parameters ---------- user Name of the user whose jobs to load. + base_url + Base URL used to form URLs to the specific jobs. phases Limit the result to jobs in this list of possible execution phases. @@ -295,12 +388,13 @@ async def list_jobs( Returns ------- - list of safir.uws._models.JobDescription - List of job descriptions matching the search criteria. + Jobs + Collection of short job descriptions. """ - return await self._storage.list_jobs( + jobs = await self._storage.list_jobs( user, phases=phases, after=after, count=count ) + return Jobs(jobref=[j.to_xml_model(base_url) for j in jobs]) async def run_sync( self, diff --git a/safir/src/safir/uws/templates/error.xml b/safir/src/safir/uws/templates/error.xml index 48face0c..5fcddef6 100644 --- a/safir/src/safir/uws/templates/error.xml +++ b/safir/src/safir/uws/templates/error.xml @@ -1,7 +1,7 @@ -{{ error.error_code.value }} {{ error.message }} +{{ error.error_code.value }}: {{ error.message }} {%- if error.detail %} {{ error.detail }} diff --git a/safir/src/safir/uws/templates/job.xml b/safir/src/safir/uws/templates/job.xml deleted file mode 100644 index ff97715c..00000000 --- a/safir/src/safir/uws/templates/job.xml +++ /dev/null @@ -1,40 +0,0 @@ - - {{ job.job_id }} - {%- if job.run_id %} - {{ job.run_id }} - {%- endif %} - {{ job.owner }} - {{ job.phase.value }} - {{ job.creation_time | isodatetime }} - {%- if job.start_time %} - {{ job.start_time | isodatetime }} - {%- endif %} - {%- if job.end_time %} - {{ job.end_time | isodatetime }} - {%- endif %} - {{ job.execution_duration.total_seconds() | int }} - {{ job.destruction_time | isodatetime }} - - {%- for param in job.parameters %} - {{ param.value }} - {%- endfor %} - - {%- if job.results %} - - {%- for result in results %} - - {%- endfor %} - - {%- endif %} - {%- if job.error %} - - {{ job.error.error_code.value }} {{ job.error.message }} - - {%- endif %} - diff --git a/safir/src/safir/uws/templates/parameters.xml b/safir/src/safir/uws/templates/parameters.xml deleted file mode 100644 index a95377f9..00000000 --- a/safir/src/safir/uws/templates/parameters.xml +++ /dev/null @@ -1,10 +0,0 @@ - - {%- for param in job.parameters %} - {{ param.value }} - {%- endfor %} - diff --git a/safir/tests/support/uws.py b/safir/tests/support/uws.py index da7b3abf..39e6d85e 100644 --- a/safir/tests/support/uws.py +++ b/safir/tests/support/uws.py @@ -8,21 +8,30 @@ from arq.connections import RedisSettings from fastapi import Form, Query from pydantic import BaseModel, SecretStr +from vo_models.uws import JobSummary, Parameter, Parameters from safir.arq import ArqMode from safir.uws import ParametersModel, UWSConfig, UWSJobParameter, UWSRoute __all__ = [ + "SimpleJobParameters", "SimpleParameters", + "assert_jobs_equal", "build_uws_config", ] +class SimpleJobParameters(Parameters): + name: Parameter = Parameter(id="name") + + class SimpleWorkerParameters(BaseModel): name: str -class SimpleParameters(ParametersModel[SimpleWorkerParameters]): +class SimpleParameters( + ParametersModel[SimpleWorkerParameters, SimpleJobParameters] +): name: str @classmethod @@ -34,6 +43,9 @@ def from_job_parameters(cls, params: list[UWSJobParameter]) -> Self: def to_worker_parameters(self) -> SimpleWorkerParameters: return SimpleWorkerParameters(name=self.name) + def to_xml_model(self) -> SimpleJobParameters: + return SimpleJobParameters(name=Parameter(id="name", value=self.name)) + async def _get_dependency( name: Annotated[str, Query()], @@ -58,6 +70,7 @@ def build_uws_config(database_url: str, database_password: str) -> UWSConfig: database_url=database_url, database_password=SecretStr(database_password), execution_duration=timedelta(minutes=10), + job_summary_type=JobSummary[SimpleJobParameters], lifetime=timedelta(days=1), parameters_type=SimpleParameters, signing_service_account="signer@example.com", @@ -70,3 +83,21 @@ def build_uws_config(database_url: str, database_password: str) -> UWSConfig: ), worker="hello", ) + + +def assert_jobs_equal(seen: str, expected: str) -> None: + """Assert that two job XML documents are equal. + + The comparison is done by converting both to a `~vo_models.uws.JobSummary` + object qualified with the parameter type used for tests. + + Parameters + ---------- + seen + XML returned by the application under test. + expected + Expected XML. + """ + seen_model = JobSummary[SimpleJobParameters].from_xml(seen) + expected_model = JobSummary[SimpleJobParameters].from_xml(expected) + assert seen_model.model_dump() == expected_model.model_dump() diff --git a/safir/tests/uws/job_api_test.py b/safir/tests/uws/job_api_test.py index 3880faa9..b0b26751 100644 --- a/safir/tests/uws/job_api_test.py +++ b/safir/tests/uws/job_api_test.py @@ -24,6 +24,8 @@ from safir.uws import UWSConfig, UWSJob, UWSJobParameter, UWSJobResult from safir.uws._dependencies import UWSFactory +from ..support.uws import SimpleJobParameters, assert_jobs_equal + PENDING_JOB = """ = 1 assert r.status_code == 200 - assert r.text == PENDING_JOB.strip().format( - "PENDING", - isodatetime(job.creation_time), - isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)), + assert_jobs_equal( + r.text, + PENDING_JOB.format( + "PENDING", + isodatetime(job.creation_time), + isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)), + ), ) # Start the job and worker. @@ -117,10 +122,13 @@ async def test_poll( ) assert r.status_code == 200 assert r.url == "https://example.com/test/jobs/1" - assert r.text == PENDING_JOB.strip().format( - "QUEUED", - isodatetime(job.creation_time), - isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)), + assert_jobs_equal( + r.text, + PENDING_JOB.format( + "QUEUED", + isodatetime(job.creation_time), + isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)), + ), ) # Poll for a change from queued, which we should see after half a second. @@ -135,10 +143,13 @@ async def test_poll( ) assert r.status_code == 200 assert job.start_time - assert r.text == EXECUTING_JOB.strip().format( - isodatetime(job.creation_time), - isodatetime(job.start_time), - isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)), + assert_jobs_equal( + r.text, + EXECUTING_JOB.format( + isodatetime(job.creation_time), + isodatetime(job.start_time), + isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)), + ), ) # Now, wait again, in parallel with the job finishing. We should get a @@ -161,10 +172,13 @@ async def test_poll( assert r.status_code == 200 assert job.start_time assert job.end_time - assert r.text == FINISHED_JOB.strip().format( - isodatetime(job.creation_time), - isodatetime(job.start_time), - isodatetime(job.end_time), - isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)), + assert_jobs_equal( + r.text, + FINISHED_JOB.format( + isodatetime(job.creation_time), + isodatetime(job.start_time), + isodatetime(job.end_time), + isodatetime(job.creation_time + timedelta(seconds=24 * 60 * 60)), + ), ) assert (current_datetime() - now).total_seconds() >= 2 diff --git a/safir/tests/uws/workers_test.py b/safir/tests/uws/workers_test.py index 4666956b..29b0e8ba 100644 --- a/safir/tests/uws/workers_test.py +++ b/safir/tests/uws/workers_test.py @@ -271,16 +271,17 @@ async def test_build_uws_worker( # Expiring jobs should do nothing since the destruction time of our one # job has not passed. - jobs = await job_service.list_jobs("user") + jobs = await job_service.list_jobs("user", "https://example.com") await expire_jobs(ctx) - assert await job_service.list_jobs("user") == jobs + assert await job_service.list_jobs("user", "https://example.com") == jobs # Change the destruction date of the job and then it should be expired. past = current_datetime() - timedelta(minutes=5) expires = await job_service.update_destruction("user", job.job_id, past) assert expires == past await expire_jobs(ctx) - assert await job_service.list_jobs("user") == [] + jobs = await job_service.list_jobs("user", "https://example.com") + assert not jobs.jobref def nonnegative(value: int) -> None: if value < 0: