Skip to content

Commit

Permalink
Merge pull request #358 from lsst-sqre/tickets/DM-48173
Browse files Browse the repository at this point in the history
DM-48173: Stop using JobService where unnecessary
  • Loading branch information
rra authored Dec 17, 2024
2 parents ede0041 + e5d746b commit 39f6bc1
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 23 deletions.
24 changes: 6 additions & 18 deletions safir/src/safir/testing/uws.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from urllib.parse import parse_qs

import respx
import structlog
from httpx import AsyncClient, Request, Response
from vo_models.uws import JobSummary
from vo_models.uws.types import ExecutionPhase
Expand All @@ -29,7 +28,6 @@
SerializedJob,
UWSConfig,
)
from safir.uws._service import JobService
from safir.uws._storage import JobStore

__all__ = [
Expand Down Expand Up @@ -275,17 +273,7 @@ class MockUWSJobRunner:

def __init__(self, config: UWSConfig, arq_queue: MockArqQueue) -> None:
self._arq = arq_queue

# This duplicates some of the code in UWSDependency to avoid needing
# to set up the result store or to expose UWSFactory outside of the
# Safir package internals.
self._store = JobStore(config, AsyncClient())
self._service = JobService(
config=config,
arq_queue=self._arq,
storage=self._store,
logger=structlog.get_logger("uws"),
)

async def get_job_metadata(self, token: str, job_id: str) -> JobMetadata:
"""Get the arq job metadata for a job.
Expand All @@ -302,7 +290,7 @@ async def get_job_metadata(self, token: str, job_id: str) -> JobMetadata:
JobMetadata
arq job metadata.
"""
job = await self._service.get(token, job_id)
job = await self._store.get(token, job_id)
assert job.message_id
return await self._arq.get_job_metadata(job.message_id)

Expand All @@ -321,7 +309,7 @@ async def get_job_result(self, token: str, job_id: str) -> JobResult:
JobMetadata
arq job metadata.
"""
job = await self._service.get(token, job_id)
job = await self._store.get(token, job_id)
assert job.message_id
return await self._arq.get_job_result(job.message_id)

Expand All @@ -346,11 +334,11 @@ async def mark_in_progress(
"""
if delay:
await asyncio.sleep(delay)
job = await self._service.get(token, job_id)
job = await self._store.get(token, job_id)
assert job.message_id
await self._arq.set_in_progress(job.message_id)
await self._store.mark_executing(token, job_id, datetime.now(tz=UTC))
return await self._service.get(token, job_id)
return await self._store.get(token, job_id)

async def mark_complete(
self,
Expand Down Expand Up @@ -380,9 +368,9 @@ async def mark_complete(
"""
if delay:
await asyncio.sleep(delay)
job = await self._service.get(token, job_id)
job = await self._store.get(token, job_id)
assert job.message_id
await self._arq.set_complete(job.message_id, result=results)
job_result = await self._arq.get_job_result(job.message_id)
await self._store.mark_completed(token, job_id, job_result)
return await self._service.get(token, job_id)
return await self._store.get(token, job_id)
5 changes: 0 additions & 5 deletions safir/src/safir/uws/_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from ._constants import JOB_RESULT_TIMEOUT, WOBBLY_REQUEST_TIMEOUT
from ._exceptions import TaskError, UnknownJobError
from ._models import Job
from ._service import JobService
from ._storage import JobStore

P = ParamSpec("P")
Expand Down Expand Up @@ -76,9 +75,6 @@ async def create_uws_worker_context(

http_client = AsyncClient(timeout=WOBBLY_REQUEST_TIMEOUT)
storage = JobStore(config, http_client)
service = JobService(
config=config, arq_queue=arq, storage=storage, logger=logger
)
slack = None
if config.slack_webhook:
slack = SlackWebhookClient(
Expand All @@ -92,7 +88,6 @@ async def create_uws_worker_context(
"arq": arq,
"http_client": http_client,
"logger": logger,
"service": service,
"slack": slack,
"storage": storage,
}
Expand Down

0 comments on commit 39f6bc1

Please sign in to comment.