diff --git a/README.md b/README.md index fab78596..37887ee2 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,51 @@ def on_message_received(msg: rtc.ChatMessage): await chat.send_message("hello world") ``` + +### RPC + +Perform your own predefined method calls from one participant to another. + +This feature is especially powerful when used with [Agents](https://docs.livekit.io/agents), for instance to forward LLM function calls to your client application. + +#### Registering an RPC method + +The participant who implements the method and will receive its calls must first register support: + +```python +@room.local_participant.register_rpc_method("greet") +async def handle_greet(data: RpcInvocationData): + print(f"Received greeting from {data.caller_identity}: {data.payload}") + return f"Hello, {data.caller_identity}!" +``` + +In addition to the payload, your handler will also receive `response_timeout`, which informs you the maximum time available to return a response. If you are unable to respond in time, the call will result in an error on the caller's side. + +#### Performing an RPC request + +The caller may then initiate an RPC call like so: + +```python +try: + response = await room.local_participant.perform_rpc( + destination_identity='recipient-identity', + method='greet', + payload='Hello from RPC!' + ) + print(f"RPC response: {response}") +except Exception as e: + print(f"RPC call failed: {e}") +``` + +You may find it useful to adjust the `response_timeout` parameter, which indicates the amount of time you will wait for a response. We recommend keeping this value as low as possible while still satisfying the constraints of your application. + +#### Errors + +LiveKit is a dynamic realtime environment and calls can fail for various reasons. + +You may throw errors of the type `RpcError` with a string `message` in an RPC method handler and they will be received on the caller's side with the message intact. Other errors will not be transmitted and will instead arrive to the caller as `1500` ("Application Error"). Other built-in errors are detailed in `RpcError`. + + ## Examples - [Facelandmark](https://github.com/livekit/python-sdks/tree/main/examples/face_landmark): Use mediapipe to detect face landmarks (eyes, nose ...) diff --git a/examples/rpc.py b/examples/rpc.py new file mode 100644 index 00000000..ac1ce337 --- /dev/null +++ b/examples/rpc.py @@ -0,0 +1,287 @@ +from livekit import rtc, api +import os +import json +import asyncio +from dotenv import load_dotenv +from livekit.rtc.rpc import RpcInvocationData + +load_dotenv(dotenv_path=".env.local", override=False) +LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY") +LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET") +LIVEKIT_URL = os.getenv("LIVEKIT_URL") +if not LIVEKIT_API_KEY or not LIVEKIT_API_SECRET or not LIVEKIT_URL: + raise ValueError( + "Missing required environment variables. Please check your .env.local file." + ) + + +async def main(): + rooms = [] # Keep track of all rooms for cleanup + try: + room_name = f"rpc-test-{os.urandom(4).hex()}" + print(f"Connecting participants to room: {room_name}") + + callers_room, greeters_room, math_genius_room = await asyncio.gather( + connect_participant("caller", room_name), + connect_participant("greeter", room_name), + connect_participant("math-genius", room_name), + ) + rooms = [callers_room, greeters_room, math_genius_room] + + register_receiver_methods(greeters_room, math_genius_room) + + try: + print("\n\nRunning greeting example...") + await asyncio.gather(perform_greeting(callers_room)) + except Exception as error: + print("Error:", error) + + try: + print("\n\nRunning error handling example...") + await perform_divide(callers_room) + except Exception as error: + print("Error:", error) + + try: + print("\n\nRunning math example...") + await perform_square_root(callers_room) + await asyncio.sleep(2) + await perform_quantum_hypergeometric_series(callers_room) + except Exception as error: + print("Error:", error) + + try: + print("\n\nRunning long calculation with timeout...") + await asyncio.create_task(perform_long_calculation(callers_room)) + except Exception as error: + print("Error:", error) + + try: + print("\n\nRunning long calculation with disconnect...") + # Start the long calculation + long_calc_task = asyncio.create_task(perform_long_calculation(callers_room)) + # Wait a bit then disconnect the math genius + await asyncio.sleep(5) + print("\nDisconnecting math genius early...") + await math_genius_room.disconnect() + # Wait for the calculation to fail + await long_calc_task + except Exception as error: + print("Error:", error) + + print("\n\nParticipants done, disconnecting remaining participants...") + await callers_room.disconnect() + await greeters_room.disconnect() + + print("Participants disconnected. Example completed.") + + except KeyboardInterrupt: + print("\nReceived interrupt signal, cleaning up...") + except Exception as e: + print(f"Unexpected error: {e}") + finally: + # Clean up all rooms + print("Disconnecting all participants...") + await asyncio.gather( + *(room.disconnect() for room in rooms), return_exceptions=True + ) + print("Cleanup complete") + + +def register_receiver_methods(greeters_room: rtc.Room, math_genius_room: rtc.Room): + @greeters_room.local_participant.register_rpc_method("arrival") + async def arrival_method( + data: RpcInvocationData, + ): + print(f'[Greeter] Oh {data.caller_identity} arrived and said "{data.payload}"') + await asyncio.sleep(2) + return "Welcome and have a wonderful day!" + + @math_genius_room.local_participant.register_rpc_method("square-root") + async def square_root_method( + data: RpcInvocationData, + ): + json_data = json.loads(data.payload) + number = json_data["number"] + print( + f"[Math Genius] I guess {data.caller_identity} wants the square root of {number}. I've only got {data.response_timeout} seconds to respond but I think I can pull it off." + ) + + print("[Math Genius] *doing math*…") + await asyncio.sleep(2) + + result = number**0.5 + print(f"[Math Genius] Aha! It's {result}") + return json.dumps({"result": result}) + + @math_genius_room.local_participant.register_rpc_method("divide") + async def divide_method( + data: RpcInvocationData, + ): + json_data = json.loads(data.payload) + dividend = json_data["dividend"] + divisor = json_data["divisor"] + print( + f"[Math Genius] {data.caller_identity} wants to divide {dividend} by {divisor}." + ) + + result = dividend / divisor + return json.dumps({"result": result}) + + @math_genius_room.local_participant.register_rpc_method("long-calculation") + async def long_calculation_method( + data: RpcInvocationData, + ): + print( + f"[Math Genius] Starting a very long calculation for {data.caller_identity}" + ) + print( + f"[Math Genius] This will take 30 seconds even though you're only giving me {data.response_timeout} seconds" + ) + await asyncio.sleep(30) + return json.dumps({"result": "Calculation complete!"}) + + +async def perform_greeting(room: rtc.Room): + print("[Caller] Letting the greeter know that I've arrived") + try: + response = await room.local_participant.perform_rpc( + destination_identity="greeter", method="arrival", payload="Hello" + ) + print(f'[Caller] That\'s nice, the greeter said: "{response}"') + except Exception as error: + print(f"[Caller] RPC call failed: {error}") + raise + + +async def perform_square_root(room: rtc.Room): + print("[Caller] What's the square root of 16?") + try: + response = await room.local_participant.perform_rpc( + destination_identity="math-genius", + method="square-root", + payload=json.dumps({"number": 16}), + ) + parsed_response = json.loads(response) + print(f"[Caller] Nice, the answer was {parsed_response['result']}") + except Exception as error: + print(f"[Caller] RPC call failed: {error}") + raise + + +async def perform_quantum_hypergeometric_series(room: rtc.Room): + print("[Caller] What's the quantum hypergeometric series of 42?") + try: + response = await room.local_participant.perform_rpc( + destination_identity="math-genius", + method="quantum-hypergeometric-series", + payload=json.dumps({"number": 42}), + ) + parsed_response = json.loads(response) + print(f"[Caller] genius says {parsed_response['result']}!") + except rtc.RpcError as error: + if error.code == rtc.RpcError.ErrorCode.UNSUPPORTED_METHOD: + print("[Caller] Aww looks like the genius doesn't know that one.") + return + print("[Caller] Unexpected error:", error) + raise + except Exception as error: + print("[Caller] Unexpected error:", error) + raise + + +async def perform_divide(room: rtc.Room): + print("[Caller] Let's divide 10 by 0.") + try: + response = await room.local_participant.perform_rpc( + destination_identity="math-genius", + method="divide", + payload=json.dumps({"dividend": 10, "divisor": 0}), + ) + parsed_response = json.loads(response) + print(f"[Caller] The result is {parsed_response['result']}") + except rtc.RpcError as error: + if error.code == rtc.RpcError.ErrorCode.APPLICATION_ERROR: + print( + "[Caller] Aww something went wrong with that one, lets try something else." + ) + else: + print(f"[Caller] RPC call failed with unexpected RpcError: {error}") + except Exception as error: + print(f"[Caller] RPC call failed with unexpected error: {error}") + + +async def perform_long_calculation(room: rtc.Room): + print("[Caller] Giving the math genius 10s to complete a long calculation") + try: + response = await room.local_participant.perform_rpc( + destination_identity="math-genius", + method="long-calculation", + payload=json.dumps({}), + response_timeout=10, + ) + parsed_response = json.loads(response) + print(f"[Caller] Result: {parsed_response['result']}") + except rtc.RpcError as error: + if error.code == rtc.RpcError.ErrorCode.RESPONSE_TIMEOUT: + print("[Caller] Math genius took too long to respond") + elif error.code == rtc.RpcError.ErrorCode.RECIPIENT_DISCONNECTED: + print("[Caller] Math genius disconnected before response was received") + else: + print(f"[Caller] Unexpected RPC error: {error}") + except Exception as error: + print(f"[Caller] Unexpected error: {error}") + + +def create_token(identity: str, room_name: str): + token = ( + api.AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET) + .with_identity(identity) + .with_grants( + api.VideoGrants( + room=room_name, + room_join=True, + can_publish=True, + can_subscribe=True, + ) + ) + ) + return token.to_jwt() + + +async def connect_participant(identity: str, room_name: str) -> rtc.Room: + room = rtc.Room() + token = create_token(identity, room_name) + + def on_disconnected(reason: str): + print(f"[{identity}] Disconnected from room: {reason}") + + room.on("disconnected", on_disconnected) + + await room.connect(LIVEKIT_URL, token) + + async def wait_for_participants(): + if room.remote_participants: + return + participant_connected = asyncio.Event() + + def _on_participant_connected(participant: rtc.RemoteParticipant): + room.off("participant_connected", _on_participant_connected) + participant_connected.set() + + room.on("participant_connected", _on_participant_connected) + await participant_connected.wait() + + try: + await asyncio.wait_for(wait_for_participants(), timeout=5.0) + except asyncio.TimeoutError: + raise TimeoutError("Timed out waiting for participants") + + return room + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nProgram terminated by user") diff --git a/livekit-rtc/livekit/rtc/__init__.py b/livekit-rtc/livekit/rtc/__init__.py index 4bac0f6f..61c2ae51 100644 --- a/livekit-rtc/livekit/rtc/__init__.py +++ b/livekit-rtc/livekit/rtc/__init__.py @@ -72,6 +72,7 @@ from .video_stream import VideoFrameEvent, VideoStream from .audio_resampler import AudioResampler, AudioResamplerQuality from .utils import combine_audio_frames +from .rpc import RpcError, RpcInvocationData __all__ = [ "ConnectionQuality", @@ -132,6 +133,8 @@ "ChatMessage", "AudioResampler", "AudioResamplerQuality", + "RpcError", + "RpcInvocationData", "EventEmitter", "combine_audio_frames", "__version__", diff --git a/livekit-rtc/livekit/rtc/participant.py b/livekit-rtc/livekit/rtc/participant.py index 6377b8e1..779e0212 100644 --- a/livekit-rtc/livekit/rtc/participant.py +++ b/livekit-rtc/livekit/rtc/participant.py @@ -15,7 +15,7 @@ from __future__ import annotations import ctypes -from typing import List, Mapping, Union +from typing import List, Union, Callable, Dict, Awaitable, Optional, Mapping, cast from abc import abstractmethod, ABC from ._ffi_client import FfiClient, FfiHandle @@ -35,6 +35,12 @@ TrackPublication, ) from .transcription import Transcription +from .rpc import RpcError +from ._proto.rpc_pb2 import RpcMethodInvocationResponseRequest +from .log import logger +import asyncio + +from .rpc import RpcInvocationData class PublishTrackError(Exception): @@ -113,6 +119,9 @@ def __init__( super().__init__(owned_info) self._room_queue = room_queue self._track_publications: dict[str, LocalTrackPublication] = {} # type: ignore + self._rpc_handlers: Dict[ + str, Callable[[RpcInvocationData], Union[Awaitable[str], str]] + ] = {} @property def track_publications(self) -> Mapping[str, LocalTrackPublication]: @@ -236,6 +245,196 @@ async def publish_transcription(self, transcription: Transcription) -> None: if cb.publish_transcription.error: raise PublishTranscriptionError(cb.publish_transcription.error) + async def perform_rpc( + self, + *, + destination_identity: str, + method: str, + payload: str, + response_timeout: Optional[float] = None, + ) -> str: + """ + Initiate an RPC call to a remote participant. + + Args: + destination_identity (str): The `identity` of the destination participant + method (str): The method name to call + payload (str): The method payload + response_timeout (Optional[float]): Timeout for receiving a response after initial connection + + Returns: + str: The response payload + + Raises: + RpcError: On failure. Details in `message`. + """ + req = proto_ffi.FfiRequest() + req.perform_rpc.local_participant_handle = self._ffi_handle.handle + req.perform_rpc.destination_identity = destination_identity + req.perform_rpc.method = method + req.perform_rpc.payload = payload + if response_timeout is not None: + req.perform_rpc.response_timeout_ms = int(response_timeout * 1000) + + queue = FfiClient.instance.queue.subscribe() + try: + resp = FfiClient.instance.request(req) + cb = await queue.wait_for( + lambda e: (e.perform_rpc.async_id == resp.perform_rpc.async_id) + ) + finally: + FfiClient.instance.queue.unsubscribe(queue) + + if cb.perform_rpc.HasField("error"): + raise RpcError._from_proto(cb.perform_rpc.error) + + return cb.perform_rpc.payload + + def register_rpc_method( + self, + method_name: str, + handler: Optional[ + Callable[[RpcInvocationData], Union[Awaitable[str], str]] + ] = None, + ) -> Union[None, Callable]: + """ + Establishes the participant as a receiver for calls of the specified RPC method. + Can be used either as a decorator or a regular method. + + The handler will recieve one argument of type `RpcInvocationData` and should return a string response which will be forwarded back to the caller. + + The handler may be synchronous or asynchronous. + + If unable to respond within `response_timeout`, the caller will hang up and receive an error on their side. + + You may raise errors of type `RpcError` in the handler, and they will be forwarded to the caller. + + Other errors raised in your handler will be caught and forwarded to the caller as "1500 Application Error". + + Args: + method_name (str): The name of the indicated RPC method. + handler (Optional[Callable]): Handler to be invoked whenever an RPC request for this method is received. Omit this argument to use the decorator syntax. + + Returns: + None (when used as a decorator it returns the decorator function) + + Example: + # As a decorator: + @room.local_participant.register_rpc_method("greet") + async def greet_handler(data: RpcInvocationData) -> str: + print(f"Received greeting from {data.caller_identity}: {data.payload}") + return f"Hello, {data.caller_identity}!" + + # As a regular method: + async def greet_handler(data: RpcInvocationData) -> str: + print(f"Received greeting from {data.caller_identity}: {data.payload}") + return f"Hello, {data.caller_identity}!" + + room.local_participant.register_rpc_method('greet', greet_handler) + """ + + def register(handler_func): + self._rpc_handlers[method_name] = handler_func + req = proto_ffi.FfiRequest() + req.register_rpc_method.local_participant_handle = self._ffi_handle.handle + req.register_rpc_method.method = method_name + FfiClient.instance.request(req) + + if handler is not None: + register(handler) + return None + else: + # Called as a decorator + return register + + def unregister_rpc_method(self, method: str) -> None: + """ + Unregisters a previously registered RPC method. + + Args: + method (str): The name of the RPC method to unregister + """ + self._rpc_handlers.pop(method, None) + + req = proto_ffi.FfiRequest() + req.unregister_rpc_method.local_participant_handle = self._ffi_handle.handle + req.unregister_rpc_method.method = method + + FfiClient.instance.request(req) + + async def _handle_rpc_method_invocation( + self, + invocation_id: int, + method: str, + request_id: str, + caller_identity: str, + payload: str, + response_timeout: float, + ) -> None: + response_error: Optional[RpcError] = None + response_payload: Optional[str] = None + + params = RpcInvocationData( + request_id, caller_identity, payload, response_timeout + ) + + handler = self._rpc_handlers.get(method) + + if not handler: + response_error = RpcError._built_in(RpcError.ErrorCode.UNSUPPORTED_METHOD) + else: + try: + if asyncio.iscoroutinefunction(handler): + async_handler = cast( + Callable[[RpcInvocationData], Awaitable[str]], handler + ) + + async def run_handler(): + try: + return await async_handler(params) + except asyncio.CancelledError: + # This will be caught by the outer try-except if it's due to timeout + raise + + try: + response_payload = await asyncio.wait_for( + run_handler(), timeout=response_timeout + ) + except asyncio.TimeoutError: + raise RpcError._built_in(RpcError.ErrorCode.RESPONSE_TIMEOUT) + except asyncio.CancelledError: + raise RpcError._built_in( + RpcError.ErrorCode.RECIPIENT_DISCONNECTED + ) + else: + sync_handler = cast(Callable[[RpcInvocationData], str], handler) + response_payload = sync_handler(params) + except RpcError as error: + response_error = error + except Exception as error: + logger.exception( + f"Uncaught error returned by RPC handler for {method}. Returning APPLICATION_ERROR instead. Original error: {error}", + ) + response_error = RpcError._built_in( + RpcError.ErrorCode.APPLICATION_ERROR + ) + + req = proto_ffi.FfiRequest( + rpc_method_invocation_response=RpcMethodInvocationResponseRequest( + local_participant_handle=self._ffi_handle.handle, + invocation_id=invocation_id, + error=response_error._to_proto() if response_error else None, + payload=response_payload, + ) + ) + + res = FfiClient.instance.request(req) + + if res.rpc_method_invocation_response.error: + logger.exception( + f"error sending rpc method invocation response: {res.rpc_method_invocation_response.error}" + ) + async def set_metadata(self, metadata: str) -> None: """ Set the metadata for the local participant. diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index 80a76e60..e35d2bbc 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -26,6 +26,7 @@ from ._proto import room_pb2 as proto_room from ._proto.room_pb2 import ConnectionState from ._proto.track_pb2 import TrackKind +from ._proto.rpc_pb2 import RpcMethodInvocationEvent from ._utils import BroadcastQueue from .e2ee import E2EEManager, E2EEOptions from .participant import LocalParticipant, Participant, RemoteParticipant @@ -130,6 +131,7 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> None: self._loop = loop or asyncio.get_event_loop() self._room_queue = BroadcastQueue[proto_ffi.FfiEvent]() self._info = proto_room.RoomInfo() + self._rpc_invocation_tasks: set[asyncio.Task] = set() self._remote_participants: Dict[str, RemoteParticipant] = {} self._connection_state = ConnectionState.CONN_DISCONNECTED @@ -399,9 +401,10 @@ async def disconnect(self) -> None: if not self.isconnected(): return + await self._drain_rpc_invocation_tasks() + req = proto_ffi.FfiRequest() req.disconnect.room_handle = self._ffi_handle.handle # type: ignore - queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) @@ -410,7 +413,6 @@ async def disconnect(self) -> None: ) finally: FfiClient.instance.queue.unsubscribe(queue) - await self._task FfiClient.instance.queue.unsubscribe(self._ffi_queue) @@ -418,7 +420,9 @@ async def _listen_task(self) -> None: # listen to incoming room events while True: event = await self._ffi_queue.get() - if event.room_event.room_handle == self._ffi_handle.handle: # type: ignore + if event.WhichOneof("message") == "rpc_method_invocation": + self._on_rpc_method_invocation(event.rpc_method_invocation) + elif event.room_event.room_handle == self._ffi_handle.handle: # type: ignore if event.room_event.HasField("eos"): break @@ -436,6 +440,30 @@ async def _listen_task(self) -> None: self._room_queue.put_nowait(event) await self._room_queue.join() + # Clean up any pending RPC invocation tasks + await self._drain_rpc_invocation_tasks() + + def _on_rpc_method_invocation(self, rpc_invocation: RpcMethodInvocationEvent): + if self._local_participant is None: + return + + if ( + rpc_invocation.local_participant_handle + == self._local_participant._ffi_handle.handle + ): + task = self._loop.create_task( + self._local_participant._handle_rpc_method_invocation( + rpc_invocation.invocation_id, + rpc_invocation.method, + rpc_invocation.request_id, + rpc_invocation.caller_identity, + rpc_invocation.payload, + rpc_invocation.response_timeout_ms / 1000.0, + ) + ) + self._rpc_invocation_tasks.add(task) + task.add_done_callback(self._rpc_invocation_tasks.discard) + def _on_room_event(self, event: proto_room.RoomEvent): which = event.WhichOneof("message") if which == "participant_connected": @@ -682,6 +710,12 @@ def _on_room_event(self, event: proto_room.RoomEvent): elif which == "reconnected": self.emit("reconnected") + async def _drain_rpc_invocation_tasks(self) -> None: + if self._rpc_invocation_tasks: + for task in self._rpc_invocation_tasks: + task.cancel() + await asyncio.gather(*self._rpc_invocation_tasks, return_exceptions=True) + def _retrieve_remote_participant( self, identity: str ) -> Optional[RemoteParticipant]: diff --git a/livekit-rtc/livekit/rtc/rpc.py b/livekit-rtc/livekit/rtc/rpc.py new file mode 100644 index 00000000..10e4d6d8 --- /dev/null +++ b/livekit-rtc/livekit/rtc/rpc.py @@ -0,0 +1,124 @@ +# Copyright 2023 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Dict, Union, ClassVar +from enum import IntEnum +from ._proto import rpc_pb2 as proto_rpc +from dataclasses import dataclass + + +@dataclass +class RpcInvocationData: + """Data passed to method handler for incoming RPC invocations + + Attributes: + request_id (str): The unique request ID. Will match at both sides of the call, useful for debugging or logging. + caller_identity (str): The unique participant identity of the caller. + payload (str): The payload of the request. User-definable format, typically JSON. + response_timeout (float): The maximum time the caller will wait for a response. + """ + + request_id: str + caller_identity: str + payload: str + response_timeout: float + + +class RpcError(Exception): + """ + Specialized error handling for RPC methods. + + Instances of this type, when thrown in a method handler, will have their `message` + serialized and sent across the wire. The caller will receive an equivalent error on the other side. + + Built-in errors are included (codes 1001-1999) but developers may use the code, message, and data fields to create their own errors. + """ + + class ErrorCode(IntEnum): + APPLICATION_ERROR = 1500 + CONNECTION_TIMEOUT = 1501 + RESPONSE_TIMEOUT = 1502 + RECIPIENT_DISCONNECTED = 1503 + RESPONSE_PAYLOAD_TOO_LARGE = 1504 + SEND_FAILED = 1505 + + UNSUPPORTED_METHOD = 1400 + RECIPIENT_NOT_FOUND = 1401 + REQUEST_PAYLOAD_TOO_LARGE = 1402 + UNSUPPORTED_SERVER = 1403 + UNSUPPORTED_VERSION = 1404 + + ErrorMessage: ClassVar[Dict[ErrorCode, str]] = { + ErrorCode.APPLICATION_ERROR: "Application error in method handler", + ErrorCode.CONNECTION_TIMEOUT: "Connection timeout", + ErrorCode.RESPONSE_TIMEOUT: "Response timeout", + ErrorCode.RECIPIENT_DISCONNECTED: "Recipient disconnected", + ErrorCode.RESPONSE_PAYLOAD_TOO_LARGE: "Response payload too large", + ErrorCode.SEND_FAILED: "Failed to send", + ErrorCode.UNSUPPORTED_METHOD: "Method not supported at destination", + ErrorCode.RECIPIENT_NOT_FOUND: "Recipient not found", + ErrorCode.REQUEST_PAYLOAD_TOO_LARGE: "Request payload too large", + ErrorCode.UNSUPPORTED_SERVER: "RPC not supported by server", + ErrorCode.UNSUPPORTED_VERSION: "Unsupported RPC version", + } + + def __init__( + self, + code: Union[int, "RpcError.ErrorCode"], + message: str, + data: Optional[str] = None, + ): + """ + Creates an error object with the given code and message, plus an optional data payload. + + If thrown in an RPC method handler, the error will be sent back to the caller. + + Args: + code (int): Your error code (Error codes 1001-1999 are reserved for built-in errors) + message (str): A readable error message. + data (Optional[str]): Optional additional data associated with the error (JSON recommended) + """ + super().__init__(message) + self._code = code + self._message = message + self._data = data + + @property + def code(self) -> int: + """Error code value. Codes 1001-1999 are reserved for built-in errors (see RpcError.ErrorCode for their meanings).""" + return self._code + + @property + def message(self) -> str: + """A readable error message.""" + return self._message + + @property + def data(self) -> Optional[str]: + """Optional additional data associated with the error (JSON recommended).""" + return self._data + + @classmethod + def _from_proto(cls, proto: proto_rpc.RpcError) -> "RpcError": + return cls(proto.code, proto.message, proto.data) + + def _to_proto(self) -> proto_rpc.RpcError: + return proto_rpc.RpcError(code=self.code, message=self.message, data=self.data) + + @classmethod + def _built_in( + cls, code: "RpcError.ErrorCode", data: Optional[str] = None + ) -> "RpcError": + message = cls.ErrorMessage[code] + return cls(code, message, data)