diff --git a/safir/src/safir/testing/uws.py b/safir/src/safir/testing/uws.py index 118e0f9..b453223 100644 --- a/safir/src/safir/testing/uws.py +++ b/safir/src/safir/testing/uws.py @@ -9,7 +9,6 @@ from urllib.parse import parse_qs import respx -import structlog from httpx import AsyncClient, Request, Response from vo_models.uws import JobSummary from vo_models.uws.types import ExecutionPhase @@ -29,7 +28,6 @@ SerializedJob, UWSConfig, ) -from safir.uws._service import JobService from safir.uws._storage import JobStore __all__ = [ @@ -275,17 +273,7 @@ class MockUWSJobRunner: def __init__(self, config: UWSConfig, arq_queue: MockArqQueue) -> None: self._arq = arq_queue - - # This duplicates some of the code in UWSDependency to avoid needing - # to set up the result store or to expose UWSFactory outside of the - # Safir package internals. self._store = JobStore(config, AsyncClient()) - self._service = JobService( - config=config, - arq_queue=self._arq, - storage=self._store, - logger=structlog.get_logger("uws"), - ) async def get_job_metadata(self, token: str, job_id: str) -> JobMetadata: """Get the arq job metadata for a job. @@ -302,7 +290,7 @@ async def get_job_metadata(self, token: str, job_id: str) -> JobMetadata: JobMetadata arq job metadata. """ - job = await self._service.get(token, job_id) + job = await self._store.get(token, job_id) assert job.message_id return await self._arq.get_job_metadata(job.message_id) @@ -321,7 +309,7 @@ async def get_job_result(self, token: str, job_id: str) -> JobResult: JobMetadata arq job metadata. """ - job = await self._service.get(token, job_id) + job = await self._store.get(token, job_id) assert job.message_id return await self._arq.get_job_result(job.message_id) @@ -346,11 +334,11 @@ async def mark_in_progress( """ if delay: await asyncio.sleep(delay) - job = await self._service.get(token, job_id) + job = await self._store.get(token, job_id) assert job.message_id await self._arq.set_in_progress(job.message_id) await self._store.mark_executing(token, job_id, datetime.now(tz=UTC)) - return await self._service.get(token, job_id) + return await self._store.get(token, job_id) async def mark_complete( self, @@ -380,9 +368,9 @@ async def mark_complete( """ if delay: await asyncio.sleep(delay) - job = await self._service.get(token, job_id) + job = await self._store.get(token, job_id) assert job.message_id await self._arq.set_complete(job.message_id, result=results) job_result = await self._arq.get_job_result(job.message_id) await self._store.mark_completed(token, job_id, job_result) - return await self._service.get(token, job_id) + return await self._store.get(token, job_id) diff --git a/safir/src/safir/uws/_workers.py b/safir/src/safir/uws/_workers.py index 48f94ca..e4991ec 100644 --- a/safir/src/safir/uws/_workers.py +++ b/safir/src/safir/uws/_workers.py @@ -30,7 +30,6 @@ from ._constants import JOB_RESULT_TIMEOUT, WOBBLY_REQUEST_TIMEOUT from ._exceptions import TaskError, UnknownJobError from ._models import Job -from ._service import JobService from ._storage import JobStore P = ParamSpec("P") @@ -76,9 +75,6 @@ async def create_uws_worker_context( http_client = AsyncClient(timeout=WOBBLY_REQUEST_TIMEOUT) storage = JobStore(config, http_client) - service = JobService( - config=config, arq_queue=arq, storage=storage, logger=logger - ) slack = None if config.slack_webhook: slack = SlackWebhookClient( @@ -92,7 +88,6 @@ async def create_uws_worker_context( "arq": arq, "http_client": http_client, "logger": logger, - "service": service, "slack": slack, "storage": storage, }