Skip to content

Commit

Permalink
Merge branch 'master' into enh/build-frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
odeimaiz authored Jun 26, 2024
2 parents 12ec24a + b5d82e0 commit 2d3db6a
Show file tree
Hide file tree
Showing 26 changed files with 536 additions and 156 deletions.
19 changes: 6 additions & 13 deletions api/specs/web-server/_computations.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,16 @@ async def get_computation(project_id: ProjectID):
"/computations/{project_id}:start",
response_model=Envelope[_ComputationStarted],
responses={
status.HTTP_404_NOT_FOUND: {
"description": "Project/wallet/pricing details not found"
},
status.HTTP_402_PAYMENT_REQUIRED: {
"description": "Insufficient osparc credits"
},
status.HTTP_406_NOT_ACCEPTABLE: {
"description": "Cluster not found",
"description": "Insufficient credits to run computation"
},
status.HTTP_503_SERVICE_UNAVAILABLE: {
"description": "Service not available",
},
status.HTTP_422_UNPROCESSABLE_ENTITY: {
"description": "Configuration error",
status.HTTP_404_NOT_FOUND: {
"description": "Project/wallet/pricing details were not found"
},
status.HTTP_402_PAYMENT_REQUIRED: {"description": "Payment required"},
status.HTTP_406_NOT_ACCEPTABLE: {"description": "Cluster not found"},
status.HTTP_409_CONFLICT: {"description": "Project already started"},
status.HTTP_422_UNPROCESSABLE_ENTITY: {"description": "Configuration error"},
status.HTTP_503_SERVICE_UNAVAILABLE: {"description": "Service not available"},
},
)
async def start_computation(project_id: ProjectID, _start: ComputationStart):
Expand Down
6 changes: 3 additions & 3 deletions packages/pytest-simcore/src/pytest_simcore/redis_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async def redis_client(
yield client

await client.flushall()
await client.close(close_connection_pool=True)
await client.aclose(close_connection_pool=True)


@pytest.fixture()
Expand All @@ -86,7 +86,7 @@ async def redis_locks_client(
yield client

await client.flushall()
await client.close(close_connection_pool=True)
await client.aclose(close_connection_pool=True)


@tenacity.retry(
Expand All @@ -103,4 +103,4 @@ async def wait_till_redis_responsive(redis_url: URL | str) -> None:
msg = f"{redis_url=} not available"
raise ConnectionError(msg)
finally:
await client.close(close_connection_pool=True)
await client.aclose(close_connection_pool=True)
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from .rest_models import ErrorItemType, ErrorType, LogMessageType
from .rest_responses import (
create_data_response,
create_error_response,
create_http_error,
is_enveloped_from_map,
is_enveloped_from_text,
wrap_as_envelope,
Expand All @@ -44,7 +44,7 @@ def error_middleware_factory(
_is_prod: bool = is_production_environ()

def _process_and_raise_unexpected_error(request: web.BaseRequest, err: Exception):
resp = create_error_response(
http_error = create_http_error(
err,
"Unexpected Server error",
web.HTTPInternalServerError,
Expand All @@ -58,11 +58,11 @@ def _process_and_raise_unexpected_error(request: web.BaseRequest, err: Exception
request.remote,
request.method,
request.path,
resp.status,
http_error.status,
exc_info=err,
stack_info=True,
)
raise resp
raise http_error

@web.middleware
async def _middleware_handler(request: web.Request, handler: Handler):
Expand Down Expand Up @@ -115,22 +115,22 @@ async def _middleware_handler(request: web.Request, handler: Handler):
raise

except NotImplementedError as err:
error_response = create_error_response(
http_error = create_http_error(
err,
f"{err}",
web.HTTPNotImplemented,
skip_internal_error_details=_is_prod,
)
raise error_response from err
raise http_error from err

except asyncio.TimeoutError as err:
error_response = create_error_response(
http_error = create_http_error(
err,
f"{err}",
web.HTTPGatewayTimeout,
skip_internal_error_details=_is_prod,
)
raise error_response from err
raise http_error from err

except Exception as err: # pylint: disable=broad-except
_process_and_raise_unexpected_error(request, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ class ErrorType:
logs: list[LogMessageType] = field(default_factory=list)
errors: list[ErrorItemType] = field(default_factory=list)
status: int = 400
message: str = "Unexpected client error"
message: str = "Unexpected error"
38 changes: 29 additions & 9 deletions packages/service-library/src/servicelib/aiohttp/rest_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from servicelib.aiohttp.status import HTTP_200_OK

from ..mimetype_constants import MIMETYPE_APPLICATION_JSON
from ..status_utils import get_code_description
from .rest_models import ErrorItemType, ErrorType

_ENVELOPE_KEYS = ("data", "error")
Expand Down Expand Up @@ -65,18 +66,20 @@ def create_data_response(

response = web.json_response(payload, dumps=json_dumps, status=status)
except (TypeError, ValueError) as err:
response = create_error_response(
[
err,
],
str(err),
web.HTTPInternalServerError,
skip_internal_error_details=skip_internal_error_details,
response = exception_to_response(
create_http_error(
[
err,
],
str(err),
web.HTTPInternalServerError,
skip_internal_error_details=skip_internal_error_details,
)
)
return response


def create_error_response(
def create_http_error(
errors: list[Exception] | Exception,
reason: str | None = None,
http_error_cls: type[HTTPError] = web.HTTPInternalServerError,
Expand All @@ -94,18 +97,23 @@ def create_error_response(
# TODO: guarantee no throw!

is_internal_error: bool = http_error_cls == web.HTTPInternalServerError
default_message = reason or get_code_description(http_error_cls.status_code)

if is_internal_error and skip_internal_error_details:
error = ErrorType(
errors=[],
status=http_error_cls.status_code,
message=default_message,
)
else:
items = [ErrorItemType.from_error(err) for err in errors]
error = ErrorType(
errors=[ErrorItemType.from_error(err) for err in errors],
errors=items,
status=http_error_cls.status_code,
message=items[0].message if items else default_message,
)

assert not http_error_cls.empty_body # nosec
payload = wrap_as_envelope(error=asdict(error))

return http_error_cls(
Expand All @@ -115,6 +123,18 @@ def create_error_response(
)


def exception_to_response(exc: HTTPError) -> web.Response:
# Returning web.HTTPException is deprecated so here we have a converter to a response
# so it can be used as
# SEE https://github.com/aio-libs/aiohttp/issues/2415
return web.Response(
status=exc.status,
headers=exc.headers,
reason=exc.reason,
text=exc.text,
)


# Inverse map from code to HTTPException classes
def _collect_http_exceptions(exception_cls: type[HTTPException] = HTTPException):
def _pred(obj) -> bool:
Expand Down
148 changes: 147 additions & 1 deletion packages/service-library/src/servicelib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@
I order to avoid cyclic dependences, please
DO NOT IMPORT ANYTHING from .
"""

import asyncio
import logging
import os
import socket
from collections.abc import Awaitable, Coroutine, Generator, Iterable
from pathlib import Path
from typing import Any, Final, cast
from typing import Any, AsyncGenerator, AsyncIterable, Final, TypeVar, cast

import toolz
from pydantic import NonNegativeInt

_logger = logging.getLogger(__name__)

_DEFAULT_GATHER_TASKS_GROUP_PREFIX: Final[str] = "gathered"
_DEFAULT_LOGGER: Final[logging.Logger] = _logger
_DEFAULT_LIMITED_CONCURRENCY: Final[int] = 1


def is_production_environ() -> bool:
"""
Expand Down Expand Up @@ -175,3 +180,144 @@ def unused_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return cast(int, s.getsockname()[1])


T = TypeVar("T")


async def limited_as_completed(
awaitables: Iterable[Awaitable[T]] | AsyncIterable[Awaitable[T]],
*,
limit: int = _DEFAULT_LIMITED_CONCURRENCY,
tasks_group_prefix: str | None = None,
) -> AsyncGenerator[asyncio.Future[T], None]:
"""Runs awaitables using limited concurrent tasks and returns
result futures unordered.
Arguments:
awaitables -- The awaitables to limit the concurrency of.
Keyword Arguments:
limit -- The maximum number of awaitables to run concurrently.
0 or negative values disables the limit. (default: {1})
tasks_group_prefix -- The prefix to use for the name of the asyncio tasks group.
If None, no name is used. (default: {None})
Returns:
nothing
Yields:
Future[T]: the future of the awaitables as they appear.
"""
try:
awaitable_iterator = aiter(awaitables) # type: ignore[arg-type]
is_async = True
except TypeError:
assert isinstance(awaitables, Iterable) # nosec
awaitable_iterator = iter(awaitables) # type: ignore[assignment]
is_async = False

completed_all_awaitables = False
pending_futures: set[asyncio.Future] = set()

try:
while pending_futures or not completed_all_awaitables:
while (
limit < 1 or len(pending_futures) < limit
) and not completed_all_awaitables:
try:
aw = (
await anext(awaitable_iterator)
if is_async
else next(awaitable_iterator) # type: ignore[call-overload]
)
future = asyncio.ensure_future(aw)
if tasks_group_prefix:
future.set_name(f"{tasks_group_prefix}-{future.get_name()}")
pending_futures.add(future)
except (StopIteration, StopAsyncIteration): # noqa: PERF203
completed_all_awaitables = True
if not pending_futures:
return
done, pending_futures = await asyncio.wait(
pending_futures, return_when=asyncio.FIRST_COMPLETED
)

for future in done:
yield future
except asyncio.CancelledError:
for future in pending_futures:
future.cancel()
await asyncio.gather(*pending_futures, return_exceptions=True)
raise


async def _wrapped(
awaitable: Awaitable[T], *, index: int, reraise: bool, logger: logging.Logger
) -> tuple[int, T | BaseException]:
try:
return index, await awaitable
except asyncio.CancelledError:
logger.debug(
"Cancelled %i-th concurrent task %s",
index + 1,
f"{awaitable=}",
)
raise
except BaseException as exc: # pylint: disable=broad-exception-caught
logger.warning(
"Error in %i-th concurrent task %s: %s",
index + 1,
f"{awaitable=}",
f"{exc=}",
)
if reraise:
raise
return index, exc


async def limited_gather(
*awaitables: Awaitable[T],
reraise: bool = True,
log: logging.Logger = _DEFAULT_LOGGER,
limit: int = _DEFAULT_LIMITED_CONCURRENCY,
tasks_group_prefix: str | None = None,
) -> list[T | BaseException | None]:
"""runs all the awaitables using the limited concurrency and returns them in the same order
Arguments:
awaitables -- The awaitables to limit the concurrency of.
Keyword Arguments:
limit -- The maximum number of awaitables to run concurrently.
setting 0 or negative values disable (default: {1})
reraise -- if True will raise at the first exception
The remaining tasks will continue as in standard asyncio gather.
If False, then the exceptions will be returned (default: {True})
log -- the logger to use for logging the exceptions (default: {_logger})
tasks_group_prefix -- The prefix to use for the name of the asyncio tasks group.
If None, 'gathered' prefix is used. (default: {None})
Returns:
the results of the awaitables keeping the order
special thanks to: https://death.andgravity.com/limit-concurrency
"""

indexed_awaitables = [
_wrapped(awaitable, reraise=reraise, index=index, logger=log)
for index, awaitable in enumerate(awaitables)
]

results: list[T | BaseException | None] = [None] * len(indexed_awaitables)
async for future in limited_as_completed(
indexed_awaitables,
limit=limit,
tasks_group_prefix=tasks_group_prefix or _DEFAULT_GATHER_TASKS_GROUP_PREFIX,
):
index, result = await future
results[index] = result

return results
Loading

0 comments on commit 2d3db6a

Please sign in to comment.