diff --git a/packages/service-library/src/servicelib/rabbitmq/_rpc_router.py b/packages/service-library/src/servicelib/rabbitmq/_rpc_router.py index 15239012682d..af0c77f727c0 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_rpc_router.py +++ b/packages/service-library/src/servicelib/rabbitmq/_rpc_router.py @@ -6,39 +6,62 @@ from typing import Any, TypeVar from models_library.rabbitmq_basic_types import RPCMethodName -from pydantic import SecretStr from ..logging_utils import log_context from ._errors import RPCServerError DecoratedCallable = TypeVar("DecoratedCallable", bound=Callable[..., Any]) +# NOTE: this is equivalent to http access logs _logger = logging.getLogger("rpc.access") -_RPC_CUSTOM_ENCODER: dict[Any, Callable[[Any], Any]] = { - SecretStr: SecretStr.get_secret_value -} + +def _create_func_msg(func, args: list[Any], kwargs: dict[str, Any]) -> str: + msg = f"{func.__name__}(" + + if args_msg := ", ".join(map(str, args)): + msg += args_msg + + if kwargs_msg := ", ".join({f"{name}={value}" for name, value in kwargs.items()}): + if args: + msg += ", " + msg += kwargs_msg + + return f"{msg})" @dataclass class RPCRouter: routes: dict[RPCMethodName, Callable] = field(default_factory=dict) - def expose(self) -> Callable[[DecoratedCallable], DecoratedCallable]: - def decorator(func: DecoratedCallable) -> DecoratedCallable: + def expose( + self, + *, + reraise_if_error_type: tuple[type[Exception], ...] | None = None, + ) -> Callable[[DecoratedCallable], DecoratedCallable]: + def _decorator(func: DecoratedCallable) -> DecoratedCallable: @functools.wraps(func) - async def wrapper(*args, **kwargs): + async def _wrapper(*args, **kwargs): + with log_context( + # NOTE: this is intentionally analogous to the http access log traces. + # To change log-level use getLogger("rpc.access").set_level(...) _logger, logging.INFO, - msg=f"calling {func.__name__} with {args}, {kwargs}", + msg=f"RPC call {_create_func_msg(func, args, kwargs)}", + log_duration=True, ): try: return await func(*args, **kwargs) + except asyncio.CancelledError: _logger.debug("call was cancelled") raise + except Exception as exc: # pylint: disable=broad-except + if reraise_if_error_type and type(exc) in reraise_if_error_type: + raise + _logger.exception("Unhandled exception:") # NOTE: we do not return internal exceptions over RPC raise RPCServerError( @@ -47,7 +70,7 @@ async def wrapper(*args, **kwargs): msg=f"{exc}", ) from None - self.routes[RPCMethodName(func.__name__)] = wrapper + self.routes[RPCMethodName(func.__name__)] = _wrapper return func - return decorator + return _decorator diff --git a/services/payments/tests/unit/test_services_auto_recharge_listener.py b/services/payments/tests/unit/test_services_auto_recharge_listener.py index c6069fb5603b..e7f3a1dac09a 100644 --- a/services/payments/tests/unit/test_services_auto_recharge_listener.py +++ b/services/payments/tests/unit/test_services_auto_recharge_listener.py @@ -4,10 +4,9 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable -from collections.abc import Callable +from collections.abc import Awaitable, Callable, Iterator from datetime import datetime, timedelta, timezone from decimal import Decimal -from typing import Awaitable, Iterator from unittest import mock import pytest