diff --git a/api/specs/web-server/_computations.py b/api/specs/web-server/_computations.py index 2458f499d54..36600f1efac 100644 --- a/api/specs/web-server/_computations.py +++ b/api/specs/web-server/_computations.py @@ -29,23 +29,16 @@ async def get_computation(project_id: ProjectID): "/computations/{project_id}:start", response_model=Envelope[_ComputationStarted], responses={ - status.HTTP_404_NOT_FOUND: { - "description": "Project/wallet/pricing details not found" - }, status.HTTP_402_PAYMENT_REQUIRED: { - "description": "Insufficient osparc credits" - }, - status.HTTP_406_NOT_ACCEPTABLE: { - "description": "Cluster not found", + "description": "Insufficient credits to run computation" }, - status.HTTP_503_SERVICE_UNAVAILABLE: { - "description": "Service not available", - }, - status.HTTP_422_UNPROCESSABLE_ENTITY: { - "description": "Configuration error", + status.HTTP_404_NOT_FOUND: { + "description": "Project/wallet/pricing details were not found" }, - status.HTTP_402_PAYMENT_REQUIRED: {"description": "Payment required"}, + status.HTTP_406_NOT_ACCEPTABLE: {"description": "Cluster not found"}, status.HTTP_409_CONFLICT: {"description": "Project already started"}, + status.HTTP_422_UNPROCESSABLE_ENTITY: {"description": "Configuration error"}, + status.HTTP_503_SERVICE_UNAVAILABLE: {"description": "Service not available"}, }, ) async def start_computation(project_id: ProjectID, _start: ComputationStart): diff --git a/packages/pytest-simcore/src/pytest_simcore/redis_service.py b/packages/pytest-simcore/src/pytest_simcore/redis_service.py index 3a84f0ceb03..a94a7a38223 100644 --- a/packages/pytest-simcore/src/pytest_simcore/redis_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/redis_service.py @@ -69,7 +69,7 @@ async def redis_client( yield client await client.flushall() - await client.close(close_connection_pool=True) + await client.aclose(close_connection_pool=True) @pytest.fixture() @@ -86,7 +86,7 @@ async def redis_locks_client( yield client await client.flushall() - await client.close(close_connection_pool=True) + await client.aclose(close_connection_pool=True) @tenacity.retry( @@ -103,4 +103,4 @@ async def wait_till_redis_responsive(redis_url: URL | str) -> None: msg = f"{redis_url=} not available" raise ConnectionError(msg) finally: - await client.close(close_connection_pool=True) + await client.aclose(close_connection_pool=True) diff --git a/packages/service-library/src/servicelib/aiohttp/rest_middlewares.py b/packages/service-library/src/servicelib/aiohttp/rest_middlewares.py index 8c12f7b3491..e68db2cbb46 100644 --- a/packages/service-library/src/servicelib/aiohttp/rest_middlewares.py +++ b/packages/service-library/src/servicelib/aiohttp/rest_middlewares.py @@ -18,7 +18,7 @@ from .rest_models import ErrorItemType, ErrorType, LogMessageType from .rest_responses import ( create_data_response, - create_error_response, + create_http_error, is_enveloped_from_map, is_enveloped_from_text, wrap_as_envelope, @@ -44,7 +44,7 @@ def error_middleware_factory( _is_prod: bool = is_production_environ() def _process_and_raise_unexpected_error(request: web.BaseRequest, err: Exception): - resp = create_error_response( + http_error = create_http_error( err, "Unexpected Server error", web.HTTPInternalServerError, @@ -58,11 +58,11 @@ def _process_and_raise_unexpected_error(request: web.BaseRequest, err: Exception request.remote, request.method, request.path, - resp.status, + http_error.status, exc_info=err, stack_info=True, ) - raise resp + raise http_error @web.middleware async def _middleware_handler(request: web.Request, handler: Handler): @@ -115,22 +115,22 @@ async def _middleware_handler(request: web.Request, handler: Handler): raise except NotImplementedError as err: - error_response = create_error_response( + http_error = create_http_error( err, f"{err}", web.HTTPNotImplemented, skip_internal_error_details=_is_prod, ) - raise error_response from err + raise http_error from err except asyncio.TimeoutError as err: - error_response = create_error_response( + http_error = create_http_error( err, f"{err}", web.HTTPGatewayTimeout, skip_internal_error_details=_is_prod, ) - raise error_response from err + raise http_error from err except Exception as err: # pylint: disable=broad-except _process_and_raise_unexpected_error(request, err) diff --git a/packages/service-library/src/servicelib/aiohttp/rest_models.py b/packages/service-library/src/servicelib/aiohttp/rest_models.py index f35cabe4394..36902f17b77 100644 --- a/packages/service-library/src/servicelib/aiohttp/rest_models.py +++ b/packages/service-library/src/servicelib/aiohttp/rest_models.py @@ -27,4 +27,4 @@ class ErrorType: logs: list[LogMessageType] = field(default_factory=list) errors: list[ErrorItemType] = field(default_factory=list) status: int = 400 - message: str = "Unexpected client error" + message: str = "Unexpected error" diff --git a/packages/service-library/src/servicelib/aiohttp/rest_responses.py b/packages/service-library/src/servicelib/aiohttp/rest_responses.py index 313590a22bc..569ab56fbca 100644 --- a/packages/service-library/src/servicelib/aiohttp/rest_responses.py +++ b/packages/service-library/src/servicelib/aiohttp/rest_responses.py @@ -13,6 +13,7 @@ from servicelib.aiohttp.status import HTTP_200_OK from ..mimetype_constants import MIMETYPE_APPLICATION_JSON +from ..status_utils import get_code_description from .rest_models import ErrorItemType, ErrorType _ENVELOPE_KEYS = ("data", "error") @@ -65,18 +66,20 @@ def create_data_response( response = web.json_response(payload, dumps=json_dumps, status=status) except (TypeError, ValueError) as err: - response = create_error_response( - [ - err, - ], - str(err), - web.HTTPInternalServerError, - skip_internal_error_details=skip_internal_error_details, + response = exception_to_response( + create_http_error( + [ + err, + ], + str(err), + web.HTTPInternalServerError, + skip_internal_error_details=skip_internal_error_details, + ) ) return response -def create_error_response( +def create_http_error( errors: list[Exception] | Exception, reason: str | None = None, http_error_cls: type[HTTPError] = web.HTTPInternalServerError, @@ -94,18 +97,23 @@ def create_error_response( # TODO: guarantee no throw! is_internal_error: bool = http_error_cls == web.HTTPInternalServerError + default_message = reason or get_code_description(http_error_cls.status_code) if is_internal_error and skip_internal_error_details: error = ErrorType( errors=[], status=http_error_cls.status_code, + message=default_message, ) else: + items = [ErrorItemType.from_error(err) for err in errors] error = ErrorType( - errors=[ErrorItemType.from_error(err) for err in errors], + errors=items, status=http_error_cls.status_code, + message=items[0].message if items else default_message, ) + assert not http_error_cls.empty_body # nosec payload = wrap_as_envelope(error=asdict(error)) return http_error_cls( @@ -115,6 +123,18 @@ def create_error_response( ) +def exception_to_response(exc: HTTPError) -> web.Response: + # Returning web.HTTPException is deprecated so here we have a converter to a response + # so it can be used as + # SEE https://github.com/aio-libs/aiohttp/issues/2415 + return web.Response( + status=exc.status, + headers=exc.headers, + reason=exc.reason, + text=exc.text, + ) + + # Inverse map from code to HTTPException classes def _collect_http_exceptions(exception_cls: type[HTTPException] = HTTPException): def _pred(obj) -> bool: diff --git a/packages/service-library/src/servicelib/utils.py b/packages/service-library/src/servicelib/utils.py index 0cd9c89613e..0f96e7af3a0 100644 --- a/packages/service-library/src/servicelib/utils.py +++ b/packages/service-library/src/servicelib/utils.py @@ -4,19 +4,24 @@ I order to avoid cyclic dependences, please DO NOT IMPORT ANYTHING from . """ + import asyncio import logging import os import socket from collections.abc import Awaitable, Coroutine, Generator, Iterable from pathlib import Path -from typing import Any, Final, cast +from typing import Any, AsyncGenerator, AsyncIterable, Final, TypeVar, cast import toolz from pydantic import NonNegativeInt _logger = logging.getLogger(__name__) +_DEFAULT_GATHER_TASKS_GROUP_PREFIX: Final[str] = "gathered" +_DEFAULT_LOGGER: Final[logging.Logger] = _logger +_DEFAULT_LIMITED_CONCURRENCY: Final[int] = 1 + def is_production_environ() -> bool: """ @@ -175,3 +180,144 @@ def unused_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return cast(int, s.getsockname()[1]) + + +T = TypeVar("T") + + +async def limited_as_completed( + awaitables: Iterable[Awaitable[T]] | AsyncIterable[Awaitable[T]], + *, + limit: int = _DEFAULT_LIMITED_CONCURRENCY, + tasks_group_prefix: str | None = None, +) -> AsyncGenerator[asyncio.Future[T], None]: + """Runs awaitables using limited concurrent tasks and returns + result futures unordered. + + Arguments: + awaitables -- The awaitables to limit the concurrency of. + + Keyword Arguments: + limit -- The maximum number of awaitables to run concurrently. + 0 or negative values disables the limit. (default: {1}) + tasks_group_prefix -- The prefix to use for the name of the asyncio tasks group. + If None, no name is used. (default: {None}) + + Returns: + nothing + + Yields: + Future[T]: the future of the awaitables as they appear. + + + """ + try: + awaitable_iterator = aiter(awaitables) # type: ignore[arg-type] + is_async = True + except TypeError: + assert isinstance(awaitables, Iterable) # nosec + awaitable_iterator = iter(awaitables) # type: ignore[assignment] + is_async = False + + completed_all_awaitables = False + pending_futures: set[asyncio.Future] = set() + + try: + while pending_futures or not completed_all_awaitables: + while ( + limit < 1 or len(pending_futures) < limit + ) and not completed_all_awaitables: + try: + aw = ( + await anext(awaitable_iterator) + if is_async + else next(awaitable_iterator) # type: ignore[call-overload] + ) + future = asyncio.ensure_future(aw) + if tasks_group_prefix: + future.set_name(f"{tasks_group_prefix}-{future.get_name()}") + pending_futures.add(future) + except (StopIteration, StopAsyncIteration): # noqa: PERF203 + completed_all_awaitables = True + if not pending_futures: + return + done, pending_futures = await asyncio.wait( + pending_futures, return_when=asyncio.FIRST_COMPLETED + ) + + for future in done: + yield future + except asyncio.CancelledError: + for future in pending_futures: + future.cancel() + await asyncio.gather(*pending_futures, return_exceptions=True) + raise + + +async def _wrapped( + awaitable: Awaitable[T], *, index: int, reraise: bool, logger: logging.Logger +) -> tuple[int, T | BaseException]: + try: + return index, await awaitable + except asyncio.CancelledError: + logger.debug( + "Cancelled %i-th concurrent task %s", + index + 1, + f"{awaitable=}", + ) + raise + except BaseException as exc: # pylint: disable=broad-exception-caught + logger.warning( + "Error in %i-th concurrent task %s: %s", + index + 1, + f"{awaitable=}", + f"{exc=}", + ) + if reraise: + raise + return index, exc + + +async def limited_gather( + *awaitables: Awaitable[T], + reraise: bool = True, + log: logging.Logger = _DEFAULT_LOGGER, + limit: int = _DEFAULT_LIMITED_CONCURRENCY, + tasks_group_prefix: str | None = None, +) -> list[T | BaseException | None]: + """runs all the awaitables using the limited concurrency and returns them in the same order + + Arguments: + awaitables -- The awaitables to limit the concurrency of. + + Keyword Arguments: + limit -- The maximum number of awaitables to run concurrently. + setting 0 or negative values disable (default: {1}) + reraise -- if True will raise at the first exception + The remaining tasks will continue as in standard asyncio gather. + If False, then the exceptions will be returned (default: {True}) + log -- the logger to use for logging the exceptions (default: {_logger}) + tasks_group_prefix -- The prefix to use for the name of the asyncio tasks group. + If None, 'gathered' prefix is used. (default: {None}) + + Returns: + the results of the awaitables keeping the order + + special thanks to: https://death.andgravity.com/limit-concurrency + """ + + indexed_awaitables = [ + _wrapped(awaitable, reraise=reraise, index=index, logger=log) + for index, awaitable in enumerate(awaitables) + ] + + results: list[T | BaseException | None] = [None] * len(indexed_awaitables) + async for future in limited_as_completed( + indexed_awaitables, + limit=limit, + tasks_group_prefix=tasks_group_prefix or _DEFAULT_GATHER_TASKS_GROUP_PREFIX, + ): + index, result = await future + results[index] = result + + return results diff --git a/packages/service-library/tests/aiohttp/test_rest_responses.py b/packages/service-library/tests/aiohttp/test_rest_responses.py index ce47777cd4f..7077a93cb0f 100644 --- a/packages/service-library/tests/aiohttp/test_rest_responses.py +++ b/packages/service-library/tests/aiohttp/test_rest_responses.py @@ -5,6 +5,7 @@ import itertools import pytest +from aiohttp import web from aiohttp.web_exceptions import ( HTTPBadRequest, HTTPError, @@ -14,10 +15,14 @@ HTTPNotModified, HTTPOk, ) +from servicelib.aiohttp import status from servicelib.aiohttp.rest_responses import ( _STATUS_CODE_TO_HTTP_ERRORS, + create_http_error, + exception_to_response, get_http_error, ) +from servicelib.mimetype_constants import MIMETYPE_APPLICATION_JSON # @@ -53,3 +58,29 @@ def test_collected_http_errors_map(status_code: int, http_error_cls: type[HTTPEr assert http_error_cls != HTTPError assert issubclass(http_error_cls, HTTPError) + + +@pytest.mark.parametrize("skip_details", [True, False]) +def tests_exception_to_response(skip_details: bool): + exception = create_http_error( + errors=[RuntimeError("foo")], + reason="Something whent wrong", + http_error_cls=web.HTTPInternalServerError, + skip_internal_error_details=skip_details, + ) + + # For now until deprecated SEE https://github.com/aio-libs/aiohttp/issues/2415 + assert isinstance(exception, Exception) + assert isinstance(exception, web.Response) + assert hasattr(exception, "__http_exception__") + + # until they have exception.make_response(), we user + response = exception_to_response(exception) + assert isinstance(response, web.Response) + assert not isinstance(response, Exception) + assert not hasattr(response, "__http_exception__") + + assert response.content_type == MIMETYPE_APPLICATION_JSON + assert response.status == status.HTTP_500_INTERNAL_SERVER_ERROR + assert response.text + assert response.body diff --git a/packages/service-library/tests/test_archiving_utils.py b/packages/service-library/tests/test_archiving_utils.py index 84bcafb9572..f6886ea509a 100644 --- a/packages/service-library/tests/test_archiving_utils.py +++ b/packages/service-library/tests/test_archiving_utils.py @@ -14,7 +14,7 @@ from concurrent.futures import ProcessPoolExecutor from dataclasses import dataclass from pathlib import Path -from typing import Callable, Iterable, Iterator, Optional +from typing import Callable, Iterable, Iterator import pytest from faker import Faker @@ -23,7 +23,12 @@ from servicelib import archiving_utils from servicelib.archiving_utils import ArchiveError, archive_dir, unarchive_dir -from .test_utils import print_tree + +def _print_tree(path: Path, level=0): + tab = " " * level + print(f"{tab}{'+' if path.is_dir() else '-'} {path if level==0 else path.name}") + for p in path.glob("*"): + _print_tree(p, level + 1) @pytest.fixture @@ -96,7 +101,7 @@ def exclude_patterns_validation_dir(tmp_path: Path, faker: Faker) -> Path: (base_dir / "d1" / "sd1" / "f2.txt").write_text(faker.text()) print("exclude_patterns_validation_dir ---") - print_tree(base_dir) + _print_tree(base_dir) return base_dir @@ -174,7 +179,7 @@ def _escape_undecodable_path(path: Path) -> Path: async def assert_same_directory_content( dir_to_compress: Path, output_dir: Path, - inject_relative_path: Optional[Path] = None, + inject_relative_path: Path | None = None, unsupported_replace: bool = False, ) -> None: def _relative_path(input_path: Path) -> Path: diff --git a/packages/service-library/tests/test_archiving_utils_extra.py b/packages/service-library/tests/test_archiving_utils_extra.py index a428b5db4aa..bc2959c2e5b 100644 --- a/packages/service-library/tests/test_archiving_utils_extra.py +++ b/packages/service-library/tests/test_archiving_utils_extra.py @@ -13,7 +13,12 @@ unarchive_dir, ) -from .test_utils import print_tree + +def _print_tree(path: Path, level=0): + tab = " " * level + print(f"{tab}{'+' if path.is_dir() else '-'} {path if level==0 else path.name}") + for p in path.glob("*"): + _print_tree(p, level + 1) @pytest.fixture @@ -32,7 +37,7 @@ def state_dir(tmp_path) -> Path: (base_dir / "d1" / "d1_1" / "d1_1_1" / "f6").touch() print("state-dir ---") - print_tree(base_dir) + _print_tree(base_dir) # + /tmp/pytest-of-crespo/pytest-95/test_override_and_prune_from_a1/original # + empty # + d1 @@ -64,7 +69,7 @@ def new_state_dir(tmp_path) -> Path: # f6 deleted -> d1/d1_1/d2_2 remains empty and should be pruned print("new-state-dir ---") - print_tree(base_dir) + _print_tree(base_dir) # + /tmp/pytest-of-crespo/pytest-95/test_override_and_prune_from_a1/updated # + d1 # + d1_1 @@ -120,7 +125,7 @@ def test_override_and_prune_folder(state_dir: Path, new_state_dir: Path): assert old_paths != got_paths print("after ----") - print_tree(state_dir) + _print_tree(state_dir) @pytest.mark.parametrize( diff --git a/packages/service-library/tests/test_utils.py b/packages/service-library/tests/test_utils.py index 005465c35df..7bfcd4cee69 100644 --- a/packages/service-library/tests/test_utils.py +++ b/packages/service-library/tests/test_utils.py @@ -3,29 +3,37 @@ # pylint:disable=redefined-outer-name import asyncio -from collections.abc import Awaitable, Coroutine -from copy import copy -from pathlib import Path +from collections.abc import AsyncIterator, Awaitable, Coroutine, Iterator +from copy import copy, deepcopy from random import randint +from typing import NoReturn +from unittest import mock import pytest from faker import Faker -from servicelib.utils import ensure_ends_with, fire_and_forget_task, logged_gather +from pytest_mock import MockerFixture +from servicelib.utils import ( + ensure_ends_with, + fire_and_forget_task, + limited_as_completed, + limited_gather, + logged_gather, +) -async def _value_error(uid, *, delay=1): - await _succeed(delay) +async def _value_error(uid: int, *, delay: int = 1) -> NoReturn: + await _succeed(uid, delay=delay) msg = f"task#{uid}" raise ValueError(msg) -async def _runtime_error(uid, *, delay=1): - await _succeed(delay) +async def _runtime_error(uid: int, *, delay: int = 1) -> NoReturn: + await _succeed(uid, delay=delay) msg = f"task#{uid}" raise RuntimeError(msg) -async def _succeed(uid, *, delay=1): +async def _succeed(uid: int, *, delay: int = 1) -> int: print(f"task#{uid} begin") await asyncio.sleep(delay) print(f"task#{uid} end") @@ -33,19 +41,19 @@ async def _succeed(uid, *, delay=1): @pytest.fixture -def coros(): +def coros() -> list[Coroutine]: return [ _succeed(0), - _value_error(1, delay=2), + _value_error(1, delay=4), _succeed(2), - _runtime_error(3), - _value_error(4, delay=0), + _runtime_error(3, delay=0), + _value_error(4, delay=2), _succeed(5), ] @pytest.fixture -def mock_logger(mocker): +def mock_logger(mocker: MockerFixture) -> Iterator[mock.Mock]: mock_logger = mocker.Mock() yield mock_logger @@ -57,7 +65,11 @@ def mock_logger(mocker): ), "Expected all 3 errors ALWAYS logged as warnings" -async def test_logged_gather(event_loop, coros, mock_logger): +async def test_logged_gather( + event_loop: asyncio.AbstractEventLoop, + coros: list[Coroutine], + mock_logger: mock.Mock, +): with pytest.raises(ValueError) as excinfo: # noqa: PT011 await logged_gather(*coros, reraise=True, log=mock_logger) @@ -79,7 +91,7 @@ async def test_logged_gather(event_loop, coros, mock_logger): assert not task.cancelled() -async def test_logged_gather_wo_raising(coros, mock_logger): +async def test_logged_gather_wo_raising(coros: list[Coroutine], mock_logger: mock.Mock): results = await logged_gather(*coros, reraise=False, log=mock_logger) assert results[0] == 0 @@ -90,13 +102,6 @@ async def test_logged_gather_wo_raising(coros, mock_logger): assert results[5] == 5 -def print_tree(path: Path, level=0): - tab = " " * level - print(f"{tab}{'+' if path.is_dir() else '-'} {path if level==0 else path.name}") - for p in path.glob("*"): - print_tree(p, level + 1) - - @pytest.fixture() async def coroutine_that_cancels() -> asyncio.Future | Awaitable: async def _self_cancelling() -> None: @@ -142,7 +147,7 @@ async def test_fire_and_forget_cancellation_no_errors_raised( async def test_fire_and_forget_1000s_tasks(faker: Faker): tasks_collection = set() - async def _some_task(n: int): + async def _some_task(n: int) -> str: await asyncio.sleep(randint(1, 3)) return f"I'm great since I slept a bit, and by the way I'm task {n}" @@ -175,3 +180,127 @@ def test_ensure_ends_with(original: str, termination: str, expected: str): assert original_copy == original assert terminated_string.endswith(termination) assert terminated_string == expected + + +@pytest.fixture +def uids(faker: Faker) -> list[int]: + return [faker.pyint() for _ in range(10)] + + +@pytest.fixture +def long_delay() -> int: + return 10 + + +@pytest.fixture +def slow_successful_coros_list(uids: list[int], long_delay: int) -> list[Coroutine]: + return [_succeed(uid, delay=long_delay) for uid in uids] + + +@pytest.fixture +def successful_coros_list(uids: list[int]) -> list[Coroutine]: + return [_succeed(uid) for uid in uids] + + +@pytest.fixture +async def successful_coros_gen(uids: list[int]) -> AsyncIterator[Coroutine]: + async def as_async_iter(it): + for x in it: + yield x + + return as_async_iter(_succeed(uid) for uid in uids) + + +@pytest.fixture(params=["list", "generator"]) +async def successful_coros( + successful_coros_list: list[Coroutine], + successful_coros_gen: AsyncIterator[Coroutine], + request: pytest.FixtureRequest, +) -> list[Coroutine] | AsyncIterator[Coroutine]: + return successful_coros_list if request.param == "list" else successful_coros_gen + + +@pytest.mark.parametrize("limit", [0, 2, 5, 10]) +async def test_limited_as_completed( + uids: list[int], + successful_coros: list[Coroutine] | AsyncIterator[Coroutine], + limit: int, +): + expected_uids = deepcopy(uids) + async for future in limited_as_completed(successful_coros, limit=limit): + result = await future + assert result is not None + assert result in expected_uids + expected_uids.remove(result) + assert len(expected_uids) == 0 + + +async def test_limited_as_completed_empty_coros(): + results = [await result async for result in limited_as_completed([])] + assert results == [] + + +@pytest.mark.parametrize("limit", [0, 2, 5, 10]) +async def test_limited_gather_limits( + uids: list[int], + successful_coros_list: list[Coroutine], + limit: int, +): + results = await limited_gather(*successful_coros_list, limit=limit) + assert results == uids + + +async def test_limited_gather( + event_loop: asyncio.AbstractEventLoop, + coros: list[Coroutine], + mock_logger: mock.Mock, +): + with pytest.raises(RuntimeError) as excinfo: + await limited_gather(*coros, reraise=True, log=mock_logger, limit=0) + + # NOTE: #3 fails first + assert "task#3" in str(excinfo.value) + + # NOTE: only first error in the list is raised, since it is not RuntimeError, that task + assert isinstance(excinfo.value, RuntimeError) + + unfinished_tasks = [ + task + for task in asyncio.all_tasks(event_loop) + if task is not asyncio.current_task() + ] + final_results = await asyncio.gather(*unfinished_tasks, return_exceptions=True) + for result in final_results: + if isinstance(result, Exception): + assert isinstance(result, ValueError | RuntimeError) + + +async def test_limited_gather_wo_raising( + coros: list[Coroutine], mock_logger: mock.Mock +): + results = await limited_gather(*coros, reraise=False, log=mock_logger, limit=0) + + assert results[0] == 0 + assert isinstance(results[1], ValueError) + assert results[2] == 2 + assert isinstance(results[3], RuntimeError) + assert isinstance(results[4], ValueError) + assert results[5] == 5 + + +async def test_limited_gather_cancellation( + event_loop: asyncio.AbstractEventLoop, slow_successful_coros_list: list[Coroutine] +): + task = asyncio.create_task(limited_gather(*slow_successful_coros_list, limit=0)) + await asyncio.sleep(3) + task.cancel() + with pytest.raises(asyncio.CancelledError): + await task + + # check all coros are cancelled + unfinished_tasks = [ + task + for task in asyncio.all_tasks(event_loop) + if task is not asyncio.current_task() + ] + assert not unfinished_tasks diff --git a/scripts/mypy.bash b/scripts/mypy.bash index 0647a7de4ea..e25b504fc4b 100755 --- a/scripts/mypy.bash +++ b/scripts/mypy.bash @@ -29,13 +29,11 @@ echo_requirements() { --interactive \ --rm \ --user="$(id --user "$USER")":"$(id --group "$USER")" \ - --entrypoint="pip" \ + --entrypoint="uv" \ "$IMAGE_NAME" \ - --no-cache-dir freeze + --no-cache-dir pip freeze } - - run() { echo Using "$(docker run --rm "$IMAGE_NAME" --version)" echo Mypy config "${MYPY_CONFIG}" diff --git a/scripts/mypy/Dockerfile b/scripts/mypy/Dockerfile index 06a82234250..930ebf7110b 100644 --- a/scripts/mypy/Dockerfile +++ b/scripts/mypy/Dockerfile @@ -1,13 +1,29 @@ # syntax=docker/dockerfile:1 ARG PYTHON_VERSION="3.10.14" -FROM python:${PYTHON_VERSION}-slim-bookworm as base +FROM python:${PYTHON_VERSION}-slim-bookworm AS base +# Sets utf-8 encoding for Python et al +ENV LANG=C.UTF-8 -COPY requirements.txt /requirements.txt +# Turns off writing .pyc files; superfluous on an ephemeral container. +ENV PYTHONDONTWRITEBYTECODE=1 \ + VIRTUAL_ENV=/home/scu/.venv +# Ensures that the python and pip executables used in the image will be +# those from our virtualenv. +ENV PATH="${VIRTUAL_ENV}/bin:$PATH" + + +# NOTE: install https://github.com/astral-sh/uv ultra-fast rust-based pip replacement RUN --mount=type=cache,mode=0755,target=/root/.cache/pip \ - pip install --upgrade pip \ - && pip install -r requirements.txt \ - && pip freeze + pip install uv~=0.1 + +RUN \ + --mount=type=cache,mode=0755,target=/root/.cache/uv \ + --mount=type=bind,source=./requirements.txt,target=requirements.txt \ + uv venv "${VIRTUAL_ENV}" \ + && uv pip install --upgrade pip wheel setuptools \ + && uv pip install -r requirements.txt \ + && uv pip list ENTRYPOINT ["mypy", "--config-file", "/config/mypy.ini", "--warn-unused-configs"] diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py index 16d2a69ce91..cea6e18770d 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py @@ -98,7 +98,7 @@ router = APIRouter() -async def _check_pipeline_not_running( +async def _check_pipeline_not_running_or_raise_409( comp_tasks_repo: CompTasksRepository, computation: ComputationCreate ) -> None: pipeline_state = utils.get_pipeline_state_from_task_states( @@ -106,7 +106,7 @@ async def _check_pipeline_not_running( ) if utils.is_pipeline_running(pipeline_state): raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, + status_code=status.HTTP_409_CONFLICT, detail=f"Project {computation.project_id} already started, current state is {pipeline_state}", ) @@ -324,7 +324,7 @@ async def create_computation( # noqa: PLR0913 project: ProjectAtDB = await project_repo.get_project(computation.project_id) # check if current state allow to modify the computation - await _check_pipeline_not_running(comp_tasks_repo, computation) + await _check_pipeline_not_running_or_raise_409(comp_tasks_repo, computation) # create the complete DAG graph complete_dag = create_complete_dag(project.workbench) diff --git a/services/director-v2/tests/integration/01/test_computation_api.py b/services/director-v2/tests/integration/01/test_computation_api.py index 9c510d5f23f..110dbd5f89b 100644 --- a/services/director-v2/tests/integration/01/test_computation_api.py +++ b/services/director-v2/tests/integration/01/test_computation_api.py @@ -835,7 +835,7 @@ async def test_update_and_delete_computation( ), f"pipeline is not in the expected starting state but in {task_out.state}" # now try to update the pipeline, is expected to be forbidden - with pytest.raises(httpx.HTTPStatusError, match=f"{status.HTTP_403_FORBIDDEN}"): + with pytest.raises(httpx.HTTPStatusError, match=f"{status.HTTP_409_CONFLICT}"): await create_pipeline( async_client, project=sleepers_project, diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py index 23c3d437cd5..44f8ad9bfab 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py @@ -134,7 +134,7 @@ async def upload_outputs( # generic case let's create an archive # only the filtered out files will be zipped tmp_folder = Path( - await stack.enter_async_context(AioTemporaryDirectory()) + await stack.enter_async_context(AioTemporaryDirectory()) # type: ignore[arg-type] ) tmp_file = tmp_folder / f"{src_folder.stem}.zip" diff --git a/services/static-webserver/client/source/class/osparc/dashboard/GridButtonBase.js b/services/static-webserver/client/source/class/osparc/dashboard/GridButtonBase.js index 61292dd7171..3a7744e76a0 100644 --- a/services/static-webserver/client/source/class/osparc/dashboard/GridButtonBase.js +++ b/services/static-webserver/client/source/class/osparc/dashboard/GridButtonBase.js @@ -349,6 +349,14 @@ qx.Class.define("osparc.dashboard.GridButtonBase", { iconLayout.recheckSize(); }, + replaceIcon: function(newIcon) { + const plusIcon = this.getChildControl("icon"); + plusIcon.exclude(); + + const bodyLayout = this.getChildControl("body"); + bodyLayout.add(newIcon, {flex: 1}); + }, + /** * Event handler for the pointer over event. */ diff --git a/services/static-webserver/client/source/class/osparc/dashboard/NewStudies.js b/services/static-webserver/client/source/class/osparc/dashboard/NewStudies.js index bedba576986..de323bfaaa6 100644 --- a/services/static-webserver/client/source/class/osparc/dashboard/NewStudies.js +++ b/services/static-webserver/client/source/class/osparc/dashboard/NewStudies.js @@ -149,16 +149,12 @@ qx.Class.define("osparc.dashboard.NewStudies", { osparc.utils.Utils.setIdToWidget(newPlanButton, templateInfo.idToWidget); if (templateInfo.billable) { // replace the plus button with the creditsImage - const plusIcon = newPlanButton.getChildControl("icon"); - plusIcon.exclude(); const creditsImage = new osparc.desktop.credits.CreditsImage(); creditsImage.getChildControl("image").set({ - marginTop: 20, - maxWidth: 60, - maxHeight: 60 + width: 60, + height: 60 }) - const bodyLayout = newPlanButton.getChildControl("body"); - bodyLayout.add(creditsImage, {flex: 1}); + newPlanButton.replaceIcon(creditsImage); newPlanButton.addListener("execute", () => { const store = osparc.store.Store.getInstance(); diff --git a/services/static-webserver/client/source/class/osparc/desktop/StudyEditor.js b/services/static-webserver/client/source/class/osparc/desktop/StudyEditor.js index 4b0848288d2..b504551a869 100644 --- a/services/static-webserver/client/source/class/osparc/desktop/StudyEditor.js +++ b/services/static-webserver/client/source/class/osparc/desktop/StudyEditor.js @@ -472,7 +472,7 @@ qx.Class.define("osparc.desktop.StudyEditor", { this.getStudy().setPipelineRunning(false); }, this); req.addListener("fail", async e => { - if (e.getTarget().getStatus() == "403") { + if (e.getTarget().getStatus() == "409") { this.getStudyLogger().error(null, "Pipeline is already running"); } else if (e.getTarget().getStatus() == "422") { this.getStudyLogger().info(null, "The pipeline is up-to-date"); diff --git a/services/static-webserver/client/source/class/osparc/desktop/credits/CreditsIndicatorButton.js b/services/static-webserver/client/source/class/osparc/desktop/credits/CreditsIndicatorButton.js index a20fe6bf454..b612461d939 100644 --- a/services/static-webserver/client/source/class/osparc/desktop/credits/CreditsIndicatorButton.js +++ b/services/static-webserver/client/source/class/osparc/desktop/credits/CreditsIndicatorButton.js @@ -27,11 +27,8 @@ qx.Class.define("osparc.desktop.credits.CreditsIndicatorButton", { }); this.getChildControl("image").set({ - marginTop: 10, width: 24, - maxWidth: 24, - height: 24, - maxHeight: 24 + height: 24 }); this.__creditsContainer = new osparc.desktop.credits.CreditsNavBarContainer(); diff --git a/services/static-webserver/client/source/class/osparc/ui/basic/SVGImage.js b/services/static-webserver/client/source/class/osparc/ui/basic/SVGImage.js index b0c34688f47..b97ce96bd61 100644 --- a/services/static-webserver/client/source/class/osparc/ui/basic/SVGImage.js +++ b/services/static-webserver/client/source/class/osparc/ui/basic/SVGImage.js @@ -16,13 +16,13 @@ ************************************************************************ */ /** - * Widget that displays an svg image and support changing its color. - * It is meant to be used for those images that are not available in the catalog of font icons we include. + * Widget that displays a SVG image and supports changing its color. + * It is meant to be used for those images that are not available in the catalogs of font icons we include. */ qx.Class.define("osparc.ui.basic.SVGImage", { - extend: qx.ui.core.Widget, + extend: osparc.ui.layout.CenteredGrid, /** * @param source @@ -30,15 +30,6 @@ qx.Class.define("osparc.ui.basic.SVGImage", { construct: function(source) { this.base(arguments); - this._setLayout(new qx.ui.layout.VBox()); - - this.set({ - allowGrowX: true, - allowGrowY: true, - alignX: "center", - alignY: "middle" - }); - if (source) { this.setSource(source); } @@ -151,9 +142,7 @@ qx.Class.define("osparc.ui.basic.SVGImage", { alignX: "center", alignY: "middle" }); - this._add(control, { - flex: 1 - }); + this.addCenteredWidget(control); break; } return control || this.base(arguments, id); @@ -166,7 +155,7 @@ qx.Class.define("osparc.ui.basic.SVGImage", { }, /** - * @param keywordOrRgb predefined keyword or rgb "0,255,0" + * @param keywordOrRgb {string} predefined keyword or rgb in the folloing format "0,255,0" */ __applyImageColor: function(keywordOrRgb) { let filterValue = this.self().keywordToCSSFilter(keywordOrRgb); diff --git a/services/static-webserver/client/source/class/osparc/ui/basic/Thumbnail.js b/services/static-webserver/client/source/class/osparc/ui/basic/Thumbnail.js index cc83795b37a..53e9d77285c 100644 --- a/services/static-webserver/client/source/class/osparc/ui/basic/Thumbnail.js +++ b/services/static-webserver/client/source/class/osparc/ui/basic/Thumbnail.js @@ -23,7 +23,7 @@ * |_____x_____|flex Spacer|_____x_____| */ qx.Class.define("osparc.ui.basic.Thumbnail", { - extend: qx.ui.core.Widget, + extend: osparc.ui.layout.CenteredGrid, /** * @param {String} source Source of the Image @@ -33,30 +33,6 @@ qx.Class.define("osparc.ui.basic.Thumbnail", { construct: function(source, maxWidth, maxHeight) { this.base(arguments); - const layout = new qx.ui.layout.Grid(); - layout.setRowFlex(0, 1); - layout.setRowFlex(2, 1); - layout.setColumnFlex(0, 1); - layout.setColumnFlex(2, 1); - this._setLayout(layout); - - [ - [0, 0], - [0, 1], - [0, 2], - [1, 0], - [1, 2], - [2, 0], - [2, 1], - [2, 2] - ].forEach(quad => { - const empty = new qx.ui.core.Spacer(); - this._add(empty, { - row: quad[0], - column: quad[1] - }); - }); - if (source) { this.setSource(source); } @@ -98,10 +74,7 @@ qx.Class.define("osparc.ui.basic.Thumbnail", { alignX: "center", alignY: "middle" }); - this._add(control, { - row: 1, - column: 1 - }); + this.addCenteredWidget(control); break; } return control || this.base(arguments, id); diff --git a/services/static-webserver/client/source/class/osparc/ui/layout/CenteredGrid.js b/services/static-webserver/client/source/class/osparc/ui/layout/CenteredGrid.js new file mode 100644 index 00000000000..8c4e98d351d --- /dev/null +++ b/services/static-webserver/client/source/class/osparc/ui/layout/CenteredGrid.js @@ -0,0 +1,64 @@ +/* ************************************************************************ + + osparc - the simcore frontend + + https://osparc.io + + Copyright: + 2024 IT'IS Foundation, https://itis.swiss + + License: + MIT: https://opensource.org/licenses/MIT + + Authors: + * Odei Maiz (odeimaiz) + +************************************************************************ */ + +/** + * Grid layout that shows an element well centered + * ___________________________________ + * |flex Spacer|flex Spacer|flex Spacer| + * |flex Spacer| element |flex Spacer| + * |flex Spacer|flex Spacer|flex Spacer| + */ +qx.Class.define("osparc.ui.layout.CenteredGrid", { + extend: qx.ui.container.Composite, + + construct: function() { + this.base(arguments); + + const layout = new qx.ui.layout.Grid(); + layout.setRowFlex(0, 1); + layout.setRowFlex(2, 1); + layout.setColumnFlex(0, 1); + layout.setColumnFlex(2, 1); + this._setLayout(layout); + + [ + [0, 0], + [0, 1], + [0, 2], + [1, 0], + [1, 2], + [2, 0], + [2, 1], + [2, 2] + ].forEach(quad => { + const empty = new qx.ui.core.Spacer(); + this._add(empty, { + row: quad[0], + column: quad[1] + }); + }); + }, + + members: { + addCenteredWidget: function(widget) { + this._add(widget, { + row: 1, + column: 1 + }); + } + } +}); diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_handlers.py b/services/web/server/src/simcore_service_webserver/director_v2/_handlers.py index 6886c2408c2..ccd523fc750 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_handlers.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_handlers.py @@ -10,7 +10,11 @@ from models_library.utils.json_serialization import json_dumps from pydantic import BaseModel, Field, ValidationError, parse_obj_as from pydantic.types import NonNegativeInt -from servicelib.aiohttp.rest_responses import create_error_response, get_http_error +from servicelib.aiohttp.rest_responses import ( + create_http_error, + exception_to_response, + get_http_error, +) from servicelib.common_headers import ( UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE, X_SIMCORE_USER_AGENT, @@ -162,15 +166,21 @@ async def start_computation(request: web.Request) -> web.Response: return envelope_json_response(data, status_cls=web.HTTPCreated) except DirectorServiceError as exc: - return create_error_response( - exc, - reason=exc.reason, - http_error_cls=get_http_error(exc.status) or web.HTTPServiceUnavailable, + return exception_to_response( + create_http_error( + exc, + reason=exc.reason, + http_error_cls=get_http_error(exc.status) or web.HTTPServiceUnavailable, + ) ) except UserDefaultWalletNotFoundError as exc: - return create_error_response(exc, http_error_cls=web.HTTPNotFound) + return exception_to_response( + create_http_error(exc, http_error_cls=web.HTTPNotFound) + ) except WalletNotEnoughCreditsError as exc: - return create_error_response(exc, http_error_cls=web.HTTPPaymentRequired) + return exception_to_response( + create_http_error(exc, http_error_cls=web.HTTPPaymentRequired) + ) @routes.post(f"/{VTAG}/computations/{{project_id}}:stop", name="stop_computation") @@ -203,7 +213,7 @@ async def stop_computation(request: web.Request) -> web.Response: raise web.HTTPNoContent(content_type=MIMETYPE_APPLICATION_JSON) except DirectorServiceError as exc: - return create_error_response( + return create_http_error( exc, reason=exc.reason, http_error_cls=get_http_error(exc.status) or web.HTTPServiceUnavailable, @@ -252,10 +262,10 @@ async def get_computation(request: web.Request) -> web.Response: dumps=json_dumps, ) except DirectorServiceError as exc: - return create_error_response( + return create_http_error( exc, reason=exc.reason, http_error_cls=get_http_error(exc.status) or web.HTTPServiceUnavailable, ) except ValidationError as exc: - return create_error_response(exc, http_error_cls=web.HTTPInternalServerError) + return create_http_error(exc, http_error_cls=web.HTTPInternalServerError) diff --git a/services/web/server/tests/integration/01/test_garbage_collection.py b/services/web/server/tests/integration/01/test_garbage_collection.py index adf3849444d..9aac48842a6 100644 --- a/services/web/server/tests/integration/01/test_garbage_collection.py +++ b/services/web/server/tests/integration/01/test_garbage_collection.py @@ -87,7 +87,7 @@ async def __delete_all_redis_keys__(redis_settings: RedisSettings): decode_responses=True, ) await client.flushall() - await client.close(close_connection_pool=True) + await client.aclose(close_connection_pool=True) @pytest.fixture(scope="session") diff --git a/services/web/server/tests/integration/02/test_computation.py b/services/web/server/tests/integration/02/test_computation.py index 2531d5761f9..23fe812b9df 100644 --- a/services/web/server/tests/integration/02/test_computation.py +++ b/services/web/server/tests/integration/02/test_computation.py @@ -85,7 +85,7 @@ class _ExpectedResponseTuple(NamedTuple): ok: int created: int no_content: int - forbidden: int + confict: int # pylint: disable=no-member def __str__(self) -> str: @@ -105,7 +105,7 @@ def standard_role_response(): ok=status.HTTP_401_UNAUTHORIZED, created=status.HTTP_401_UNAUTHORIZED, no_content=status.HTTP_401_UNAUTHORIZED, - forbidden=status.HTTP_401_UNAUTHORIZED, + confict=status.HTTP_401_UNAUTHORIZED, ), ), pytest.param( @@ -114,7 +114,7 @@ def standard_role_response(): ok=status.HTTP_200_OK, created=status.HTTP_201_CREATED, no_content=status.HTTP_204_NO_CONTENT, - forbidden=status.HTTP_403_FORBIDDEN, + confict=status.HTTP_409_CONFLICT, ), ), pytest.param( @@ -123,7 +123,7 @@ def standard_role_response(): ok=status.HTTP_200_OK, created=status.HTTP_201_CREATED, no_content=status.HTTP_204_NO_CONTENT, - forbidden=status.HTTP_403_FORBIDDEN, + confict=status.HTTP_409_CONFLICT, ), ), pytest.param( @@ -132,7 +132,7 @@ def standard_role_response(): ok=status.HTTP_200_OK, created=status.HTTP_201_CREATED, no_content=status.HTTP_204_NO_CONTENT, - forbidden=status.HTTP_403_FORBIDDEN, + confict=status.HTTP_409_CONFLICT, ), ), ], @@ -390,7 +390,7 @@ async def test_start_stop_computation( if not error: # starting again should be disallowed, since it's already running resp = await client.post(f"{url_start}") - assert resp.status == expected.forbidden + assert resp.status == expected.confict assert "pipeline_id" in data assert data["pipeline_id"] == project_id diff --git a/services/web/server/tests/unit/with_dbs/conftest.py b/services/web/server/tests/unit/with_dbs/conftest.py index c4b5807d58f..1217e6f39b0 100644 --- a/services/web/server/tests/unit/with_dbs/conftest.py +++ b/services/web/server/tests/unit/with_dbs/conftest.py @@ -579,7 +579,7 @@ async def redis_locks_client( yield client await client.flushall() - await client.close(close_connection_pool=True) + await client.aclose(close_connection_pool=True) # SOCKETS FIXTURES --------------------------------------------------------