From f1cbefbe490d18a0552509a84e23bcb0f4ae7b60 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Fri, 12 Jan 2024 11:03:42 +0100 Subject: [PATCH 1/8] ruff it --- .../api/routes/files.py | 2 +- .../core/_profiler_middleware.py | 5 +- .../services/director_v2.py | 2 +- .../services/log_streaming.py | 23 +++++----- .../services/solver_job_models_converters.py | 2 +- .../services/webserver.py | 9 ++-- .../utils/http_calls_capture_processing.py | 46 ++++++++----------- .../tests/unit/_with_db/conftest.py | 3 +- .../tests/unit/_with_db/test_product.py | 4 +- .../tests/unit/api_solvers/conftest.py | 6 +-- .../test_api_routers_solvers_jobs_logs.py | 5 +- .../api-server/tests/unit/test_api_files.py | 12 +++-- .../tests/unit/test_api_solver_jobs.py | 13 +++--- .../api-server/tests/unit/test_api_solvers.py | 2 +- .../api-server/tests/unit/test_api_wallets.py | 7 +-- .../tests/unit/test_services_rabbitmq.py | 10 ++-- ...t_services_solver_job_models_converters.py | 4 +- 17 files changed, 74 insertions(+), 81 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/files.py b/services/api-server/src/simcore_service_api_server/api/routes/files.py index 37d22ce739e..f3925fbd122 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/files.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/files.py @@ -172,7 +172,7 @@ async def upload_file( request: Request, file: Annotated[UploadFile, FileParam(...)], user_id: Annotated[int, Depends(get_current_user_id)], - content_length: str | None = Header(None), # noqa: B008 + content_length: str | None = Header(None), ): """Uploads a single file to the system""" # TODO: For the moment we upload file here and re-upload to S3 diff --git a/services/api-server/src/simcore_service_api_server/core/_profiler_middleware.py b/services/api-server/src/simcore_service_api_server/core/_profiler_middleware.py index af9ab60159b..3225d3ce946 100644 --- a/services/api-server/src/simcore_service_api_server/core/_profiler_middleware.py +++ b/services/api-server/src/simcore_service_api_server/core/_profiler_middleware.py @@ -14,7 +14,7 @@ def _check_response_headers( "application/x-ndjson", "application/json", } # nosec - headers: dict = dict() + headers: dict = {} headers[b"content-type"] = b"application/x-ndjson" return list(headers.items()) @@ -36,7 +36,8 @@ def is_last_response(response_headers: dict[bytes, bytes], message: dict[str, An return True if (more_body := message.get("more_body")) is not None: return not more_body - raise RuntimeError("Could not determine if last response") + msg = "Could not determine if last response" + raise RuntimeError(msg) class ApiServerProfilerMiddleware: diff --git a/services/api-server/src/simcore_service_api_server/services/director_v2.py b/services/api-server/src/simcore_service_api_server/services/director_v2.py index dbb18f12569..e0fdd1c41d2 100644 --- a/services/api-server/src/simcore_service_api_server/services/director_v2.py +++ b/services/api-server/src/simcore_service_api_server/services/director_v2.py @@ -84,7 +84,7 @@ def _handle_errors_context(project_id: UUID): detail=f"Job {project_id} not found", ) from err - raise err + raise # server errors are logged and re-raised as 503 assert codes.is_server_error(err.response.status_code) # nosec diff --git a/services/api-server/src/simcore_service_api_server/services/log_streaming.py b/services/api-server/src/simcore_service_api_server/services/log_streaming.py index 8c9ec8193e4..acc8bdf075c 100644 --- a/services/api-server/src/simcore_service_api_server/services/log_streaming.py +++ b/services/api-server/src/simcore_service_api_server/services/log_streaming.py @@ -1,7 +1,8 @@ import asyncio from asyncio import Queue +from collections.abc import AsyncIterable, Awaitable, Callable from datetime import datetime, timezone -from typing import AsyncIterable, Awaitable, Callable, Final +from typing import Final from models_library.rabbitmq_messages import LoggerRabbitMessage from models_library.users import UserID @@ -61,9 +62,8 @@ async def _distribute_logs(self, data: bytes): ) callback = self._log_streamers.get(item.job_id) if callback is None: - raise LogStreamerNotRegistered( - f"Could not forward log because a logstreamer associated with job_id={item.job_id} was not registered" - ) + msg = f"Could not forward log because a logstreamer associated with job_id={item.job_id} was not registered" + raise LogStreamerNotRegistered(msg) await callback(item) return True @@ -71,9 +71,8 @@ async def register( self, job_id: JobID, callback: Callable[[JobLog], Awaitable[None]] ): if job_id in self._log_streamers: - raise LogStreamerRegistionConflict( - f"A stream was already connected to {job_id=}. Only a single stream can be connected at the time" - ) + msg = f"A stream was already connected to {job_id=}. Only a single stream can be connected at the time" + raise LogStreamerRegistionConflict(msg) self._log_streamers[job_id] = callback await self._rabbit_client.add_topics( LoggerRabbitMessage.get_channel_name(), topics=[f"{job_id}.*"] @@ -81,7 +80,8 @@ async def register( async def deregister(self, job_id: JobID): if job_id not in self._log_streamers: - raise LogStreamerNotRegistered(f"No stream was connected to {job_id=}.") + msg = f"No stream was connected to {job_id=}." + raise LogStreamerNotRegistered(msg) await self._rabbit_client.remove_topics( LoggerRabbitMessage.get_channel_name(), topics=[f"{job_id}.*"] ) @@ -123,13 +123,12 @@ async def __aexit__(self, exc_type, exc, tb): async def _project_done(self) -> bool: task = await self._director2_api.get_computation(self._job_id, self._user_id) - return not task.stopped is None + return task.stopped is not None async def log_generator(self) -> AsyncIterable[str]: if not self._is_registered: - raise LogStreamerNotRegistered( - f"LogStreamer for job_id={self._job_id} is not correctly registered" - ) + msg = f"LogStreamer for job_id={self._job_id} is not correctly registered" + raise LogStreamerNotRegistered(msg) last_log_time: datetime | None = None while True: while self._queue.empty(): diff --git a/services/api-server/src/simcore_service_api_server/services/solver_job_models_converters.py b/services/api-server/src/simcore_service_api_server/services/solver_job_models_converters.py index 087f251019e..140d56e7bc0 100644 --- a/services/api-server/src/simcore_service_api_server/services/solver_job_models_converters.py +++ b/services/api-server/src/simcore_service_api_server/services/solver_job_models_converters.py @@ -205,7 +205,7 @@ def create_job_from_project( assert urllib.parse.quote_plus(solver_key) in project.name # nosec # get solver node - node_id = list(project.workbench.keys())[0] + node_id = next(iter(project.workbench.keys())) solver_node: Node = project.workbench[node_id] job_inputs: JobInputs = create_job_inputs_from_node_inputs( inputs=solver_node.inputs or {} diff --git a/services/api-server/src/simcore_service_api_server/services/webserver.py b/services/api-server/src/simcore_service_api_server/services/webserver.py index 5fd727f0560..1dba7cd9dba 100644 --- a/services/api-server/src/simcore_service_api_server/services/webserver.py +++ b/services/api-server/src/simcore_service_api_server/services/webserver.py @@ -406,8 +406,7 @@ async def get_project_node_pricing_unit( ) response.raise_for_status() - data = Envelope[PricingUnitGet].parse_raw(response.text).data - return data + return Envelope[PricingUnitGet].parse_raw(response.text).data async def connect_pricing_unit_to_project_node( self, @@ -469,8 +468,7 @@ async def get_project_wallet(self, project_id: ProjectID) -> WalletGet | None: cookies=self.session_cookies, ) response.raise_for_status() - data = Envelope[WalletGet].parse_raw(response.text).data - return data + return Envelope[WalletGet].parse_raw(response.text).data # PRODUCTS ------------------------------------------------- @@ -498,8 +496,7 @@ async def get_service_pricing_plan( cookies=self.session_cookies, ) response.raise_for_status() - data = Envelope[ServicePricingPlanGet].parse_raw(response.text).data - return data + return Envelope[ServicePricingPlanGet].parse_raw(response.text).data # MODULES APP SETUP ------------------------------------------------------------- diff --git a/services/api-server/src/simcore_service_api_server/utils/http_calls_capture_processing.py b/services/api-server/src/simcore_service_api_server/utils/http_calls_capture_processing.py index 93a9f47c6f8..747c4cfb345 100644 --- a/services/api-server/src/simcore_service_api_server/utils/http_calls_capture_processing.py +++ b/services/api-server/src/simcore_service_api_server/utils/http_calls_capture_processing.py @@ -52,22 +52,20 @@ def check_compatibility(cls, values): anyOf = values.get("anyOf") allOf = values.get("allOf") oneOf = values.get("oneOf") - if type_ != "str": - if pattern is not None or format_ is not None: - raise ValueError( - f"For {type_=} both {pattern=} and {format_=} must be None" - ) + if type_ != "str" and (pattern is not None or format_ is not None): + msg = f"For {type_=} both {pattern=} and {format_=} must be None" + raise ValueError(msg) if type_ is None and oneOf is None and anyOf is None and allOf is None: - raise ValueError("all of 'type_', 'oneOf', 'anyOf' and 'allOf' were None") + msg = "all of 'type_', 'oneOf', 'anyOf' and 'allOf' were None" + raise ValueError(msg) def _check_no_recursion(v: list["CapturedParameterSchema"]): if v is not None and not all( elm.anyOf is None and elm.oneOf is None and elm.allOf is None for elm in v ): - raise ValueError( - "For simplicity we only allow top level schema have oneOf, anyOf or allOf" - ) + msg = "For simplicity we only allow top level schema have oneOf, anyOf or allOf" + raise ValueError(msg) _check_no_recursion(anyOf) _check_no_recursion(allOf) @@ -78,9 +76,8 @@ def _check_no_recursion(v: list["CapturedParameterSchema"]): def regex_pattern(self) -> str: # first deal with recursive types: if self.oneOf: - raise NotImplementedError( - "Current version cannot compute regex patterns in case of oneOf. Please go ahead and implement it yourself." - ) + msg = "Current version cannot compute regex patterns in case of oneOf. Please go ahead and implement it yourself." + raise NotImplementedError(msg) if self.anyOf: return "|".join([elm.regex_pattern for elm in self.anyOf]) if self.allOf: @@ -101,9 +98,8 @@ def regex_pattern(self) -> str: else: pattern = r".*" # should match any string if pattern is None: - raise OpenApiSpecIssue( - f"Encountered invalid {self.type_=} and {self.format_=} combination" - ) + msg = f"Encountered invalid {self.type_=} and {self.format_=} combination" + raise OpenApiSpecIssue(msg) return pattern @@ -175,9 +171,8 @@ def _get_openapi_specs(host: service_hosts) -> dict[str, Any]: settings = DirectorV2Settings() url = settings.base_url + "/api/v2/openapi.json" else: - raise OpenApiSpecIssue( - f"{host=} has not been added yet to the testing system. Please do so yourself" - ) + msg = f"{host=} has not been added yet to the testing system. Please do so yourself" + raise OpenApiSpecIssue(msg) with httpx.Client() as session: # http://127.0.0.1:30010/dev/doc/swagger.json # http://127.0.0.1:8006/api/v0/openapi.json @@ -224,9 +219,8 @@ def parts(p: str) -> tuple[str, ...]: path=p, path_parameters=list(path_params.values()), ) - raise PathNotInOpenApiSpecification( - f"Could not find a path matching {response_path} in " - ) + msg = f"Could not find a path matching {response_path} in " + raise PathNotInOpenApiSpecification(msg) def _get_params( @@ -235,15 +229,13 @@ def _get_params( """Returns all parameters for the method associated with a given resource (and optionally also a given method)""" endpoints: dict[str, Any] | None if (endpoints := openapi_spec["paths"].get(path)) is None: - raise PathNotInOpenApiSpecification( - f"{path} was not in the openapi specification" - ) + msg = f"{path} was not in the openapi specification" + raise PathNotInOpenApiSpecification(msg) all_params: list[CapturedParameter] = [] for verb in [method] if method is not None else list(endpoints): if (verb_spec := endpoints.get(verb)) is None: - raise VerbNotInPath( - f"the verb '{verb}' was not available in '{path}' in {openapi_spec}" - ) + msg = f"the verb '{verb}' was not available in '{path}' in {openapi_spec}" + raise VerbNotInPath(msg) if (params := verb_spec.get("parameters")) is None: continue all_params += parse_obj_as(list[CapturedParameter], params) diff --git a/services/api-server/tests/unit/_with_db/conftest.py b/services/api-server/tests/unit/_with_db/conftest.py index 3570c60dbb1..250b67d986c 100644 --- a/services/api-server/tests/unit/_with_db/conftest.py +++ b/services/api-server/tests/unit/_with_db/conftest.py @@ -8,9 +8,8 @@ import shutil import subprocess import sys -from collections.abc import AsyncIterator, Callable +from collections.abc import AsyncGenerator, AsyncIterator, Callable from pathlib import Path -from typing import AsyncGenerator import aiopg.sa import aiopg.sa.engine as aiopg_sa_engine diff --git a/services/api-server/tests/unit/_with_db/test_product.py b/services/api-server/tests/unit/_with_db/test_product.py index af0fb995ffd..bd14faf087e 100644 --- a/services/api-server/tests/unit/_with_db/test_product.py +++ b/services/api-server/tests/unit/_with_db/test_product.py @@ -5,8 +5,8 @@ # pylint: disable=unused-variable import datetime +from collections.abc import AsyncGenerator, Callable from decimal import Decimal -from typing import AsyncGenerator, Callable import httpx import respx @@ -102,7 +102,7 @@ def _get_service_side_effect(request: httpx.Request, **kwargs): ).mock(side_effect=_get_service_side_effect) for key in keys: - response = await client.get( + await client.get( f"{API_VTAG}/solvers/simcore/services/comp/isolve/releases/2.0.24", auth=httpx.BasicAuth(key.api_key, key.api_secret), ) diff --git a/services/api-server/tests/unit/api_solvers/conftest.py b/services/api-server/tests/unit/api_solvers/conftest.py index 18e8bab0825..a7b813776da 100644 --- a/services/api-server/tests/unit/api_solvers/conftest.py +++ b/services/api-server/tests/unit/api_solvers/conftest.py @@ -3,10 +3,10 @@ # pylint: disable=unused-variable -from collections.abc import Callable +from collections.abc import AsyncIterable, Callable from copy import deepcopy from datetime import datetime, timedelta -from typing import Any, AsyncIterable, Final +from typing import Any, Final import httpx import pytest @@ -109,4 +109,4 @@ def _get_computation(request: httpx.Request, **kwargs) -> httpx.Response: mocked_directorv2_service_api_base.get( path__regex=r"/v2/computations/(?P[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})" ).mock(side_effect=_get_computation) - yield mocked_directorv2_service_api_base + return mocked_directorv2_service_api_base diff --git a/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py b/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py index 5958f9661dc..0456b74f704 100644 --- a/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py +++ b/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py @@ -7,8 +7,9 @@ import asyncio import logging +from collections.abc import Awaitable, Callable, Iterable from pprint import pprint -from typing import Awaitable, Callable, Final, Iterable +from typing import Final import httpx import pytest @@ -84,7 +85,7 @@ def fake_project_for_streaming( mocker.patch( "simcore_service_api_server.api.routes.solvers_jobs_getters._raise_if_job_not_associated_with_solver" ) - yield fake_project + return fake_project async def test_log_streaming( diff --git a/services/api-server/tests/unit/test_api_files.py b/services/api-server/tests/unit/test_api_files.py index 015a3db91df..09b534c83df 100644 --- a/services/api-server/tests/unit/test_api_files.py +++ b/services/api-server/tests/unit/test_api_files.py @@ -1,4 +1,5 @@ import datetime +from collections.abc import Callable # pylint: disable=protected-access # pylint: disable=redefined-outer-name @@ -6,7 +7,7 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable from pathlib import Path -from typing import Any, Callable +from typing import Any from uuid import UUID import httpx @@ -210,7 +211,7 @@ def delete_side_effect( ) -> Any: return capture.response_body - respx_mock = respx_mock_from_capture( + respx_mock_from_capture( [mocked_storage_service_api_base], project_tests_dir / "mocks" / "delete_file.json", [search_side_effect, delete_side_effect], @@ -283,7 +284,7 @@ async def test_get_upload_links( ) assert response.status_code == status.HTTP_200_OK else: - assert False + raise AssertionError @pytest.mark.parametrize( @@ -327,10 +328,11 @@ def side_effect_callback( response["data"][0]["file_uuid"] = "/".join(file_uuid_parts) response["data"][0]["file_id"] = "/".join(file_uuid_parts) else: - raise ValueError(f"Encountered unexpected {key=}") + msg = f"Encountered unexpected {key=}" + raise ValueError(msg) return response - respx_mock = respx_mock_from_capture( + respx_mock_from_capture( [mocked_storage_service_api_base], project_tests_dir / "mocks" / "search_file_checksum.json", [side_effect_callback], diff --git a/services/api-server/tests/unit/test_api_solver_jobs.py b/services/api-server/tests/unit/test_api_solver_jobs.py index 028978fcafa..eb949fa37eb 100644 --- a/services/api-server/tests/unit/test_api_solver_jobs.py +++ b/services/api-server/tests/unit/test_api_solver_jobs.py @@ -1,6 +1,7 @@ from decimal import Decimal +from collections.abc import Callable from pathlib import Path -from typing import Any, Callable, Final +from typing import Any, Final from uuid import UUID import httpx @@ -90,7 +91,7 @@ def _get_wallet_side_effect( response["data"]["walletId"] = _wallet_id return response - respx_mock = respx_mock_from_capture( + respx_mock_from_capture( [mocked_webserver_service_api_base], project_tests_dir / "mocks" / capture, [_get_job_wallet_side_effect, _get_wallet_side_effect], @@ -168,7 +169,7 @@ def _get_pricing_unit_side_effect( ) -> Any: return capture.response_body - respx_mock = respx_mock_from_capture( + respx_mock_from_capture( [mocked_webserver_service_api_base], project_tests_dir / "mocks" / capture_file, [_get_job_side_effect, _get_pricing_unit_side_effect] @@ -252,7 +253,7 @@ def _put_pricing_plan_and_unit_side_effect( callbacks.append(get_inspect_job_side_effect(job_id=_job_id)) _put_pricing_plan_and_unit_side_effect.was_called = False - respx_mock = respx_mock_from_capture( + respx_mock_from_capture( [mocked_webserver_service_api_base, mocked_directorv2_service_api_base], project_tests_dir / "mocks" / capture_name, callbacks, @@ -289,7 +290,7 @@ async def test_get_solver_job_pricing_unit_no_payment( _version: str = "2.1.24" _job_id: str = "1eefc09b-5d08-4022-bc18-33dedbbd7d0f" - respx_mock = respx_mock_from_capture( + respx_mock_from_capture( [mocked_directorv2_service_api_base, mocked_webserver_service_api_base], project_tests_dir / "mocks" / "start_job_no_payment.json", [_start_job_side_effect, get_inspect_job_side_effect(job_id=_job_id)], @@ -330,7 +331,7 @@ def _stop_job_side_effect( return jsonable_encoder(task) - respx_mock = respx_mock_from_capture( + respx_mock_from_capture( [mocked_directorv2_service_api_base], project_tests_dir / "mocks" / "stop_job.json", [_stop_job_side_effect, get_inspect_job_side_effect(job_id=_job_id)], diff --git a/services/api-server/tests/unit/test_api_solvers.py b/services/api-server/tests/unit/test_api_solvers.py index 0969f70e8bd..eac6ce83471 100644 --- a/services/api-server/tests/unit/test_api_solvers.py +++ b/services/api-server/tests/unit/test_api_solvers.py @@ -1,5 +1,5 @@ +from collections.abc import Callable from pathlib import Path -from typing import Callable import httpx import pytest diff --git a/services/api-server/tests/unit/test_api_wallets.py b/services/api-server/tests/unit/test_api_wallets.py index 92f3fdd7aea..84749dbae02 100644 --- a/services/api-server/tests/unit/test_api_wallets.py +++ b/services/api-server/tests/unit/test_api_wallets.py @@ -1,5 +1,6 @@ +from collections.abc import Callable from pathlib import Path -from typing import Any, Callable +from typing import Any import httpx import pytest @@ -43,7 +44,7 @@ def _get_wallet_side_effect( response["data"]["walletId"] = path_params["wallet_id"] return response - respx_mock = respx_mock_from_capture( + respx_mock_from_capture( [mocked_webserver_service_api_base], project_tests_dir / "mocks" / capture, [_get_wallet_side_effect], @@ -73,7 +74,7 @@ async def test_get_default_wallet( project_tests_dir: Path, ): - respx_mock = respx_mock_from_capture( + respx_mock_from_capture( [mocked_webserver_service_api_base], project_tests_dir / "mocks" / "get_default_wallet.json", [], diff --git a/services/api-server/tests/unit/test_services_rabbitmq.py b/services/api-server/tests/unit/test_services_rabbitmq.py index ee42af20fdc..906c95d90ee 100644 --- a/services/api-server/tests/unit/test_services_rabbitmq.py +++ b/services/api-server/tests/unit/test_services_rabbitmq.py @@ -7,10 +7,10 @@ import asyncio import logging import random -from collections.abc import AsyncIterable, Callable +from collections.abc import AsyncIterable, Callable, Iterable from contextlib import asynccontextmanager from datetime import datetime, timedelta -from typing import Final, Iterable +from typing import Final from unittest.mock import AsyncMock import httpx @@ -99,7 +99,7 @@ def node_id(faker: Faker) -> NodeID: async def log_distributor( client: httpx.AsyncClient, app: FastAPI ) -> AsyncIterable[LogDistributor]: - yield get_log_distributor(app) + return get_log_distributor(app) async def test_subscribe_publish_receive_logs( @@ -219,7 +219,7 @@ async def _(job_log: JobLog): pass await log_distributor.register(project_id, _) - with pytest.raises(LogStreamerRegistionConflict) as e_info: + with pytest.raises(LogStreamerRegistionConflict): await log_distributor.register(project_id, _) await log_distributor.deregister(project_id) @@ -314,7 +314,7 @@ def computation_done() -> Iterable[Callable[[], bool]]: def _job_done() -> bool: return datetime.now() >= stop_time - yield _job_done + return _job_done @pytest.fixture diff --git a/services/api-server/tests/unit/test_services_solver_job_models_converters.py b/services/api-server/tests/unit/test_services_solver_job_models_converters.py index 415c0e60623..a7972c66f3e 100644 --- a/services/api-server/tests/unit/test_services_solver_job_models_converters.py +++ b/services/api-server/tests/unit/test_services_solver_job_models_converters.py @@ -76,7 +76,7 @@ def test_job_to_node_inputs_conversion(): ), } ) - for _name, value in job_inputs.values.items(): + for value in job_inputs.values.values(): assert parse_obj_as(ArgumentTypes, value) == value node_inputs: InputsDict = { @@ -93,7 +93,7 @@ def test_job_to_node_inputs_conversion(): ), } - for _name, value in node_inputs.items(): + for value in node_inputs.values(): assert parse_obj_as(InputTypes, value) == value # test transformations in both directions From 7dc8ba32f49bfed8fbb1731354dfb792c18190e2 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Fri, 12 Jan 2024 11:04:59 +0100 Subject: [PATCH 2/8] log --- .../src/simcore_service_api_server/utils/client_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/api-server/src/simcore_service_api_server/utils/client_base.py b/services/api-server/src/simcore_service_api_server/utils/client_base.py index 779d9285907..d83cae1d4b0 100644 --- a/services/api-server/src/simcore_service_api_server/utils/client_base.py +++ b/services/api-server/src/simcore_service_api_server/utils/client_base.py @@ -30,7 +30,7 @@ async def is_responsive(self) -> bool: resp.raise_for_status() return True except (httpx.HTTPStatusError, httpx.RequestError) as err: - _logger.error("%s not responsive: %s", self.service_name, err) + _logger.debug("%s not responsive: %s", self.service_name, err) return False ping = is_responsive # alias From e82078844825eeacd61dbff27131bc80267f1eee Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Fri, 12 Jan 2024 16:31:28 +0100 Subject: [PATCH 3/8] errors --- .../simcore_service_api_server/core/errors.py | 53 ++++++++++++++++ .../api-server/tests/unit/test_core_errors.py | 63 +++++++++++++++++++ 2 files changed, 116 insertions(+) create mode 100644 services/api-server/tests/unit/test_core_errors.py diff --git a/services/api-server/src/simcore_service_api_server/core/errors.py b/services/api-server/src/simcore_service_api_server/core/errors.py index e69de29bb2d..f8cba0c762f 100644 --- a/services/api-server/src/simcore_service_api_server/core/errors.py +++ b/services/api-server/src/simcore_service_api_server/core/errors.py @@ -0,0 +1,53 @@ +from enum import auto + +import httpx +from models_library.utils.enums import StrAutoEnum +from pydantic.errors import PydanticErrorMixin +from servicelib.fastapi.httpx_utils import to_httpx_command + + +class _BaseAppError(PydanticErrorMixin, ValueError): + @classmethod + def get_full_class_name(cls) -> str: + # Can be used as unique code identifier + return f"{cls.__module__}.{cls.__name__}" + + +class BackendEnum(StrAutoEnum): + CATALOG = auto() + DIRECTOR = auto() + STORAGE = auto() + WEBSERVER = auto() + + +class BackendServiceError(_BaseAppError): + http_status_error: httpx.HTTPStatusError | None = None + service: BackendEnum + + msg_template = "{service} error" + + @classmethod + def from_httpx_status_error( + cls, service: BackendEnum, error: httpx.HTTPStatusError, **ctx + ) -> "BackendServiceError": + return cls(http_status_error=error, service=service, **ctx) + + def get_debug_message(self) -> str: + msg = f"{self}" + if http_status_error := getattr(self, "http_status_error", None): + resp = http_status_error.response + # request + msg += f"\n\t'{to_httpx_command(http_status_error.request)}'" + msg += f"\n\t'{resp.text}'" + # status, latency + msg += f"\n\t{resp.status_code}, {resp.elapsed.total_seconds()*1E6}us" + # response + return msg + + +class DirectorError(BackendServiceError): + service = BackendEnum.DIRECTOR + + +class WebServerError(BackendServiceError): + service = BackendEnum.WEBSERVER diff --git a/services/api-server/tests/unit/test_core_errors.py b/services/api-server/tests/unit/test_core_errors.py new file mode 100644 index 00000000000..4d5f3c3897c --- /dev/null +++ b/services/api-server/tests/unit/test_core_errors.py @@ -0,0 +1,63 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable +# pylint: disable=too-many-arguments + +from collections.abc import AsyncIterator, Iterator + +import httpx +import pytest +import respx +from fastapi import status +from httpx import AsyncClient +from simcore_service_api_server.core.errors import BackendEnum, BackendServiceError + + +@pytest.fixture +def base_url() -> str: + return f"https://{__name__}" + + +@pytest.fixture +def mock_server_api(base_url: str) -> Iterator[respx.MockRouter]: + with respx.mock( + base_url=base_url, + assert_all_called=False, + assert_all_mocked=True, # IMPORTANT: KEEP always True! + ) as mock: + + mock.get("/ok").respond(status.HTTP_200_OK) + mock.post(path__startswith="/fail").respond( + status.HTTP_500_INTERNAL_SERVER_ERROR, text="FAILURE" + ) + + yield mock + + +@pytest.fixture +async def client( + mock_server_api: respx.MockRouter, base_url: str +) -> AsyncIterator[AsyncClient]: + async with httpx.AsyncClient(base_url=base_url) as cli: + yield cli + + +async def test_backend_error(client: AsyncClient): + + try: + response = await client.post("/fail", params={"id": 3}, json={"x": 2}) + response.raise_for_status() + + except httpx.HTTPStatusError as err: + service_error = BackendServiceError.from_httpx_status_error( + service=BackendEnum.DIRECTOR, error=err + ) + + assert hasattr(service_error, "http_status_error") # auto-injected as context + + assert service_error.get_debug_message() + assert service_error.service == BackendEnum.DIRECTOR + assert ( + service_error.get_full_class_name() + == "simcore_service_api_server.core.errors.BackendServiceError" + ) From caf78af0a6564a16217a5265a50c5d239e2a39fb Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Fri, 12 Jan 2024 17:03:47 +0100 Subject: [PATCH 4/8] error handling --- .../simcore_service_api_server/core/errors.py | 10 +- .../services/director_v2.py | 148 +++++++++++------- .../api-server/tests/unit/test_core_errors.py | 7 +- 3 files changed, 100 insertions(+), 65 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/core/errors.py b/services/api-server/src/simcore_service_api_server/core/errors.py index f8cba0c762f..1ebf152d066 100644 --- a/services/api-server/src/simcore_service_api_server/core/errors.py +++ b/services/api-server/src/simcore_service_api_server/core/errors.py @@ -3,6 +3,7 @@ import httpx from models_library.utils.enums import StrAutoEnum from pydantic.errors import PydanticErrorMixin +from servicelib.error_codes import create_error_code from servicelib.fastapi.httpx_utils import to_httpx_command @@ -12,6 +13,9 @@ def get_full_class_name(cls) -> str: # Can be used as unique code identifier return f"{cls.__module__}.{cls.__name__}" + def get_error_code(self): + return create_error_code(self) + class BackendEnum(StrAutoEnum): CATALOG = auto() @@ -28,9 +32,9 @@ class BackendServiceError(_BaseAppError): @classmethod def from_httpx_status_error( - cls, service: BackendEnum, error: httpx.HTTPStatusError, **ctx + cls, error: httpx.HTTPStatusError, **ctx ) -> "BackendServiceError": - return cls(http_status_error=error, service=service, **ctx) + return cls(http_status_error=error, service=cls.service, **ctx) def get_debug_message(self) -> str: msg = f"{self}" @@ -38,10 +42,10 @@ def get_debug_message(self) -> str: resp = http_status_error.response # request msg += f"\n\t'{to_httpx_command(http_status_error.request)}'" + # response msg += f"\n\t'{resp.text}'" # status, latency msg += f"\n\t{resp.status_code}, {resp.elapsed.total_seconds()*1E6}us" - # response return msg diff --git a/services/api-server/src/simcore_service_api_server/services/director_v2.py b/services/api-server/src/simcore_service_api_server/services/director_v2.py index e0fdd1c41d2..15719015411 100644 --- a/services/api-server/src/simcore_service_api_server/services/director_v2.py +++ b/services/api-server/src/simcore_service_api_server/services/director_v2.py @@ -1,26 +1,30 @@ import logging from contextlib import contextmanager -from typing import Any, ClassVar +from typing import Any, ClassVar, TypeAlias from uuid import UUID +import httpx from fastapi import FastAPI from fastapi.exceptions import HTTPException -from httpx import HTTPStatusError, codes +from httpx import codes from models_library.clusters import ClusterID from models_library.projects_nodes import NodeID from models_library.projects_pipeline import ComputationTask from models_library.projects_state import RunningState from pydantic import AnyHttpUrl, AnyUrl, BaseModel, Field, PositiveInt, parse_raw_as +from servicelib.error_codes import create_error_code +from servicelib.fastapi.httpx_utils import to_httpx_command from simcore_service_api_server.db.repositories.groups_extra_properties import ( GroupsExtraPropertiesRepository, ) from starlette import status +from ..core.errors import DirectorError from ..core.settings import DirectorV2Settings from ..models.schemas.jobs import PercentageInt from ..utils.client_base import BaseServiceClientApi, setup_client_instance -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) # API MODELS --------------------------------------------- @@ -59,10 +63,8 @@ class TaskLogFileGet(BaseModel): ) -NodeName = str -DownloadLink = AnyUrl - -# API CLASS --------------------------------------------- +NodeName: TypeAlias = str +DownloadLink: TypeAlias = AnyUrl @contextmanager @@ -70,32 +72,53 @@ def _handle_errors_context(project_id: UUID): try: yield - # except ValidationError - except HTTPStatusError as err: - msg = ( - f"Failed {err.request.url} with status={err.response.status_code}: {err.response.json()}", - ) + except httpx.HTTPStatusError as err: if codes.is_client_error(err.response.status_code): - # client errors are mapped - logger.debug(msg) if err.response.status_code == status.HTTP_404_NOT_FOUND: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Job {project_id} not found", ) from err - raise + # FIXME: what to do with these + raise DirectorError.from_httpx_status_error(err) from err - # server errors are logged and re-raised as 503 - assert codes.is_server_error(err.response.status_code) # nosec + else: + # server errors are logged and re-raised as 503 + assert codes.is_server_error(err.response.status_code) # nosec - logger.exception( - "director-v2 service failed: %s. Re-rasing as service unavailable (503)", - msg, + oec = create_error_code(err) + err_detail = ( + f"Service handling job '{project_id}' unexpectedly failed [{oec}]" + ) + _logger.exception( + "%s: %s", + err_detail, + DirectorError.from_httpx_status_error(err).get_debug_message(), + extra={"error_code": oec}, + ) + + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=err_detail, + ) from err + + except httpx.TimeoutException as err: + oec = create_error_code(err) + err_detail = ( + f"Service handling job operation on '{project_id}' timed out [{oec}]" + ) + _logger.exception( + "%s: %s", + err_detail, + to_httpx_command(err.request), + extra={"error_code": oec}, ) + + # SEE https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504 raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail="Director service failed", + status_code=status.HTTP_504_GATEWAY_TIMEOUT, + detail=err_detail, ) from err @@ -126,7 +149,9 @@ async def start_computation( groups_extra_properties_repository: GroupsExtraPropertiesRepository, cluster_id: ClusterID | None = None, ) -> ComputationTaskGet: + with _handle_errors_context(project_id): + extras = {} use_on_demand_clusters = ( @@ -155,56 +180,63 @@ async def start_computation( async def get_computation( self, project_id: UUID, user_id: PositiveInt ) -> ComputationTaskGet: - response = await self.client.get( - f"/v2/computations/{project_id}", - params={ - "user_id": user_id, - }, - ) - response.raise_for_status() - return ComputationTaskGet(**response.json()) + + with _handle_errors_context(project_id): + + response = await self.client.get( + f"/v2/computations/{project_id}", + params={ + "user_id": user_id, + }, + ) + response.raise_for_status() + + return ComputationTaskGet(**response.json()) async def stop_computation( self, project_id: UUID, user_id: PositiveInt ) -> ComputationTaskGet: - response = await self.client.post( - f"/v2/computations/{project_id}:stop", - json={ - "user_id": user_id, - }, - ) + with _handle_errors_context(project_id): + response = await self.client.post( + f"/v2/computations/{project_id}:stop", + json={ + "user_id": user_id, + }, + ) - return ComputationTaskGet(**response.json()) + return ComputationTaskGet(**response.json()) async def delete_computation(self, project_id: UUID, user_id: PositiveInt): - await self.client.request( - "DELETE", - f"/v2/computations/{project_id}", - json={ - "user_id": user_id, - "force": True, - }, - ) + with _handle_errors_context(project_id): + await self.client.request( + "DELETE", + f"/v2/computations/{project_id}", + json={ + "user_id": user_id, + "force": True, + }, + ) async def get_computation_logs( self, user_id: PositiveInt, project_id: UUID ) -> dict[NodeName, DownloadLink]: - response = await self.client.get( - f"/v2/computations/{project_id}/tasks/-/logfile", - params={ - "user_id": user_id, - }, - ) + with _handle_errors_context(project_id): + response = await self.client.get( + f"/v2/computations/{project_id}/tasks/-/logfile", + params={ + "user_id": user_id, + }, + ) - # probably not found - response.raise_for_status() + # probably not found + response.raise_for_status() - node_to_links: dict[NodeName, DownloadLink] = {} - for r in parse_raw_as(list[TaskLogFileGet], response.text or "[]"): - if r.download_link: - node_to_links[f"{r.task_id}"] = r.download_link + node_to_links: dict[NodeName, DownloadLink] = {} + for r in parse_raw_as(list[TaskLogFileGet], response.text or "[]"): + if r.download_link: + node_to_links[f"{r.task_id}"] = r.download_link - return node_to_links + return node_to_links # MODULES APP SETUP ------------------------------------------------------------- diff --git a/services/api-server/tests/unit/test_core_errors.py b/services/api-server/tests/unit/test_core_errors.py index 4d5f3c3897c..8f5394c02e7 100644 --- a/services/api-server/tests/unit/test_core_errors.py +++ b/services/api-server/tests/unit/test_core_errors.py @@ -10,7 +10,7 @@ import respx from fastapi import status from httpx import AsyncClient -from simcore_service_api_server.core.errors import BackendEnum, BackendServiceError +from simcore_service_api_server.core.errors import BackendEnum, DirectorError @pytest.fixture @@ -49,9 +49,8 @@ async def test_backend_error(client: AsyncClient): response.raise_for_status() except httpx.HTTPStatusError as err: - service_error = BackendServiceError.from_httpx_status_error( - service=BackendEnum.DIRECTOR, error=err - ) + + service_error = DirectorError.from_httpx_status_error(err) assert hasattr(service_error, "http_status_error") # auto-injected as context From 5157d4e3e90f1adf846604dcfad7d552506ce9c9 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Fri, 12 Jan 2024 17:05:35 +0100 Subject: [PATCH 5/8] minor --- .../api-server/src/simcore_service_api_server/core/errors.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/core/errors.py b/services/api-server/src/simcore_service_api_server/core/errors.py index 1ebf152d066..2e4e62591d8 100644 --- a/services/api-server/src/simcore_service_api_server/core/errors.py +++ b/services/api-server/src/simcore_service_api_server/core/errors.py @@ -3,7 +3,6 @@ import httpx from models_library.utils.enums import StrAutoEnum from pydantic.errors import PydanticErrorMixin -from servicelib.error_codes import create_error_code from servicelib.fastapi.httpx_utils import to_httpx_command @@ -13,9 +12,6 @@ def get_full_class_name(cls) -> str: # Can be used as unique code identifier return f"{cls.__module__}.{cls.__name__}" - def get_error_code(self): - return create_error_code(self) - class BackendEnum(StrAutoEnum): CATALOG = auto() From 687222bd5084eb2819f7ff828d4ab52bfa2f6393 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Sat, 13 Jan 2024 14:01:27 +0100 Subject: [PATCH 6/8] adds error handlers --- .../services/director_v2.py | 44 ++++++++++++------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/services/director_v2.py b/services/api-server/src/simcore_service_api_server/services/director_v2.py index 15719015411..4e2ffc1f1fa 100644 --- a/services/api-server/src/simcore_service_api_server/services/director_v2.py +++ b/services/api-server/src/simcore_service_api_server/services/director_v2.py @@ -104,6 +104,7 @@ def _handle_errors_context(project_id: UUID): ) from err except httpx.TimeoutException as err: + # TODO: refer resource? oec = create_error_code(err) err_detail = ( f"Service handling job operation on '{project_id}' timed out [{oec}]" @@ -121,6 +122,21 @@ def _handle_errors_context(project_id: UUID): detail=err_detail, ) from err + except httpx.HTTPError as err: + oec = create_error_code(err) + err_detail = f"Unexpected error while processing job '{project_id}' [{oec}]" + _logger.exception( + "%s: %s", + err_detail, + to_httpx_command(err.request), + extra={"error_code": oec}, + ) + + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=err_detail, + ) from err + class DirectorV2Api(BaseServiceClientApi): async def create_computation( @@ -129,17 +145,18 @@ async def create_computation( user_id: PositiveInt, product_name: str, ) -> ComputationTaskGet: - response = await self.client.post( - "/v2/computations", - json={ - "user_id": user_id, - "project_id": str(project_id), - "start_pipeline": False, - "product_name": product_name, - }, - ) - response.raise_for_status() - return ComputationTaskGet(**response.json()) + with _handle_errors_context(project_id): + response = await self.client.post( + "/v2/computations", + json={ + "user_id": user_id, + "project_id": str(project_id), + "start_pipeline": False, + "product_name": product_name, + }, + ) + response.raise_for_status() + return ComputationTaskGet(**response.json()) async def start_computation( self, @@ -149,11 +166,8 @@ async def start_computation( groups_extra_properties_repository: GroupsExtraPropertiesRepository, cluster_id: ClusterID | None = None, ) -> ComputationTaskGet: - with _handle_errors_context(project_id): - extras = {} - use_on_demand_clusters = ( await groups_extra_properties_repository.use_on_demand_clusters( user_id, product_name @@ -180,9 +194,7 @@ async def start_computation( async def get_computation( self, project_id: UUID, user_id: PositiveInt ) -> ComputationTaskGet: - with _handle_errors_context(project_id): - response = await self.client.get( f"/v2/computations/{project_id}", params={ From e394ed386efe629b6081a17172d4d86fb53d1267 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Sun, 28 Jan 2024 17:00:30 +0100 Subject: [PATCH 7/8] minor --- services/api-server/tests/unit/test_services_rabbitmq.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/services/api-server/tests/unit/test_services_rabbitmq.py b/services/api-server/tests/unit/test_services_rabbitmq.py index 906c95d90ee..c830db93705 100644 --- a/services/api-server/tests/unit/test_services_rabbitmq.py +++ b/services/api-server/tests/unit/test_services_rabbitmq.py @@ -96,9 +96,7 @@ def node_id(faker: Faker) -> NodeID: @pytest.fixture -async def log_distributor( - client: httpx.AsyncClient, app: FastAPI -) -> AsyncIterable[LogDistributor]: +async def log_distributor(client: httpx.AsyncClient, app: FastAPI) -> LogDistributor: return get_log_distributor(app) From 68e520fb6f7eefd14d61bdaad6b5f5349ad370d6 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Sun, 28 Jan 2024 17:02:08 +0100 Subject: [PATCH 8/8] ruff: exceptions naming --- .../api/errors/log_handling_error.py | 12 ++++++------ .../simcore_service_api_server/core/application.py | 6 ++---- .../services/log_streaming.py | 14 +++++++------- .../tests/unit/test_services_rabbitmq.py | 8 ++++---- 4 files changed, 19 insertions(+), 21 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/api/errors/log_handling_error.py b/services/api-server/src/simcore_service_api_server/api/errors/log_handling_error.py index 3c4c5bbb467..c18ab81c35d 100644 --- a/services/api-server/src/simcore_service_api_server/api/errors/log_handling_error.py +++ b/services/api-server/src/simcore_service_api_server/api/errors/log_handling_error.py @@ -3,21 +3,21 @@ from starlette.responses import JSONResponse from ...services.log_streaming import ( - LogDistributionBaseException, - LogStreamerNotRegistered, - LogStreamerRegistionConflict, + LogDistributionBaseError, + LogStreamerNotRegisteredError, + LogStreamerRegistionConflictError, ) from .http_error import create_error_json_response async def log_handling_error_handler( - _: Request, exc: LogDistributionBaseException + _: Request, exc: LogDistributionBaseError ) -> JSONResponse: msg = f"{exc}" status_code: int = 500 - if isinstance(exc, LogStreamerNotRegistered): + if isinstance(exc, LogStreamerNotRegisteredError): status_code = status.HTTP_500_INTERNAL_SERVER_ERROR - elif isinstance(exc, LogStreamerRegistionConflict): + elif isinstance(exc, LogStreamerRegistionConflictError): status_code = status.HTTP_409_CONFLICT return create_error_json_response(msg, status_code=status_code) diff --git a/services/api-server/src/simcore_service_api_server/core/application.py b/services/api-server/src/simcore_service_api_server/core/application.py index 581735cf54e..675341d00a5 100644 --- a/services/api-server/src/simcore_service_api_server/core/application.py +++ b/services/api-server/src/simcore_service_api_server/core/application.py @@ -12,9 +12,7 @@ from simcore_service_api_server.api.errors.log_handling_error import ( log_handling_error_handler, ) -from simcore_service_api_server.services.log_streaming import ( - LogDistributionBaseException, -) +from simcore_service_api_server.services.log_streaming import LogDistributionBaseError from starlette import status from starlette.exceptions import HTTPException @@ -108,7 +106,7 @@ def init_app(settings: ApplicationSettings | None = None) -> FastAPI: app.add_exception_handler(HTTPException, http_error_handler) app.add_exception_handler(RequestValidationError, http422_error_handler) app.add_exception_handler(HTTPStatusError, httpx_client_error_handler) - app.add_exception_handler(LogDistributionBaseException, log_handling_error_handler) + app.add_exception_handler(LogDistributionBaseError, log_handling_error_handler) app.add_exception_handler(CustomBaseError, custom_error_handler) # SEE https://docs.python.org/3/library/exceptions.html#exception-hierarchy diff --git a/services/api-server/src/simcore_service_api_server/services/log_streaming.py b/services/api-server/src/simcore_service_api_server/services/log_streaming.py index acc8bdf075c..3ef2fef416c 100644 --- a/services/api-server/src/simcore_service_api_server/services/log_streaming.py +++ b/services/api-server/src/simcore_service_api_server/services/log_streaming.py @@ -16,15 +16,15 @@ _SLEEP_SECONDS_BEFORE_CHECK_JOB_STATUS: Final[PositiveInt] = 10 -class LogDistributionBaseException(Exception): +class LogDistributionBaseError(Exception): pass -class LogStreamerNotRegistered(LogDistributionBaseException): +class LogStreamerNotRegisteredError(LogDistributionBaseError): pass -class LogStreamerRegistionConflict(LogDistributionBaseException): +class LogStreamerRegistionConflictError(LogDistributionBaseError): pass @@ -63,7 +63,7 @@ async def _distribute_logs(self, data: bytes): callback = self._log_streamers.get(item.job_id) if callback is None: msg = f"Could not forward log because a logstreamer associated with job_id={item.job_id} was not registered" - raise LogStreamerNotRegistered(msg) + raise LogStreamerNotRegisteredError(msg) await callback(item) return True @@ -72,7 +72,7 @@ async def register( ): if job_id in self._log_streamers: msg = f"A stream was already connected to {job_id=}. Only a single stream can be connected at the time" - raise LogStreamerRegistionConflict(msg) + raise LogStreamerRegistionConflictError(msg) self._log_streamers[job_id] = callback await self._rabbit_client.add_topics( LoggerRabbitMessage.get_channel_name(), topics=[f"{job_id}.*"] @@ -81,7 +81,7 @@ async def register( async def deregister(self, job_id: JobID): if job_id not in self._log_streamers: msg = f"No stream was connected to {job_id=}." - raise LogStreamerNotRegistered(msg) + raise LogStreamerNotRegisteredError(msg) await self._rabbit_client.remove_topics( LoggerRabbitMessage.get_channel_name(), topics=[f"{job_id}.*"] ) @@ -128,7 +128,7 @@ async def _project_done(self) -> bool: async def log_generator(self) -> AsyncIterable[str]: if not self._is_registered: msg = f"LogStreamer for job_id={self._job_id} is not correctly registered" - raise LogStreamerNotRegistered(msg) + raise LogStreamerNotRegisteredError(msg) last_log_time: datetime | None = None while True: while self._queue.empty(): diff --git a/services/api-server/tests/unit/test_services_rabbitmq.py b/services/api-server/tests/unit/test_services_rabbitmq.py index c830db93705..8b81c95acb7 100644 --- a/services/api-server/tests/unit/test_services_rabbitmq.py +++ b/services/api-server/tests/unit/test_services_rabbitmq.py @@ -41,8 +41,8 @@ from simcore_service_api_server.services.log_streaming import ( LogDistributor, LogStreamer, - LogStreamerNotRegistered, - LogStreamerRegistionConflict, + LogStreamerNotRegisteredError, + LogStreamerRegistionConflictError, ) pytest_simcore_core_services_selection = [ @@ -217,7 +217,7 @@ async def _(job_log: JobLog): pass await log_distributor.register(project_id, _) - with pytest.raises(LogStreamerRegistionConflict): + with pytest.raises(LogStreamerRegistionConflictError): await log_distributor.register(project_id, _) await log_distributor.deregister(project_id) @@ -413,6 +413,6 @@ async def test_log_generator(mocker: MockFixture, faker: Faker): async def test_log_generator_context(mocker: MockFixture, faker: Faker): log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, max_log_check_seconds=1) # type: ignore - with pytest.raises(LogStreamerNotRegistered): + with pytest.raises(LogStreamerNotRegisteredError): async for log in log_streamer.log_generator(): print(log)