Skip to content

Commit

Permalink
reraise errors in prc
Browse files Browse the repository at this point in the history
  • Loading branch information
pcrespov committed Dec 11, 2023
1 parent 01a5ab6 commit f25e08b
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 12 deletions.
43 changes: 33 additions & 10 deletions packages/service-library/src/servicelib/rabbitmq/_rpc_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,62 @@
from typing import Any, TypeVar

from models_library.rabbitmq_basic_types import RPCMethodName
from pydantic import SecretStr

from ..logging_utils import log_context
from ._errors import RPCServerError

DecoratedCallable = TypeVar("DecoratedCallable", bound=Callable[..., Any])

# NOTE: this is equivalent to http access logs
_logger = logging.getLogger("rpc.access")

_RPC_CUSTOM_ENCODER: dict[Any, Callable[[Any], Any]] = {
SecretStr: SecretStr.get_secret_value
}

def _create_func_msg(func, args: list[Any], kwargs: dict[str, Any]) -> str:
msg = f"{func.__name__}("

if args_msg := ", ".join(map(str, args)):
msg += args_msg

if kwargs_msg := ", ".join({f"{name}={value}" for name, value in kwargs.items()}):
if args:
msg += ", "
msg += kwargs_msg

return f"{msg})"


@dataclass
class RPCRouter:
routes: dict[RPCMethodName, Callable] = field(default_factory=dict)

def expose(self) -> Callable[[DecoratedCallable], DecoratedCallable]:
def decorator(func: DecoratedCallable) -> DecoratedCallable:
def expose(
self,
*,
reraise_if_error_type: tuple[type[Exception], ...] | None = None,
) -> Callable[[DecoratedCallable], DecoratedCallable]:
def _decorator(func: DecoratedCallable) -> DecoratedCallable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
async def _wrapper(*args, **kwargs):

with log_context(
# NOTE: this is intentionally analogous to the http access log traces.
# To change log-level use getLogger("rpc.access").set_level(...)
_logger,
logging.INFO,
msg=f"calling {func.__name__} with {args}, {kwargs}",
msg=f"RPC call {_create_func_msg(func, args, kwargs)}",
log_duration=True,
):
try:
return await func(*args, **kwargs)

except asyncio.CancelledError:
_logger.debug("call was cancelled")
raise

except Exception as exc: # pylint: disable=broad-except
if reraise_if_error_type and type(exc) in reraise_if_error_type:
raise

_logger.exception("Unhandled exception:")
# NOTE: we do not return internal exceptions over RPC
raise RPCServerError(
Expand All @@ -47,7 +70,7 @@ async def wrapper(*args, **kwargs):
msg=f"{exc}",
) from None

self.routes[RPCMethodName(func.__name__)] = wrapper
self.routes[RPCMethodName(func.__name__)] = _wrapper
return func

return decorator
return _decorator
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
# pylint: disable=unused-argument
# pylint: disable=unused-variable

from collections.abc import Callable
from collections.abc import Awaitable, Callable, Iterator
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from typing import Awaitable, Iterator
from unittest import mock

import pytest
Expand Down

0 comments on commit f25e08b

Please sign in to comment.