Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC implementation via FFI #283

Merged
merged 59 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
3c8550a
deps
bcherry Oct 8, 2024
f027914
rpcerror
bcherry Oct 8, 2024
a0082c5
participant
bcherry Oct 8, 2024
a0e2bef
p
bcherry Oct 8, 2024
4511ebe
generated protobuf
github-actions[bot] Oct 8, 2024
7e01e90
Close to working
bcherry Oct 8, 2024
cec803a
Merge remote-tracking branch 'origin/bcherry/rpc' into bcherry/rpc
bcherry Oct 8, 2024
1f7296c
generated protobuf
github-actions[bot] Oct 8, 2024
6d48330
r
bcherry Oct 8, 2024
f2c50bb
wip
bcherry Oct 8, 2024
d6f8db8
Fixes
bcherry Oct 8, 2024
01b9ede
fmt
bcherry Oct 8, 2024
8923184
format
bcherry Oct 8, 2024
e22b112
fixes
bcherry Oct 8, 2024
575d79c
add divide by zero example
bcherry Oct 8, 2024
775b199
byz
bcherry Oct 8, 2024
f02e98b
ruff
bcherry Oct 8, 2024
0c7fc92
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 15, 2024
8c50014
use identity
bcherry Oct 15, 2024
e562202
sm
bcherry Oct 23, 2024
b263701
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 23, 2024
cb76a60
p
bcherry Oct 23, 2024
0afd947
fixes
bcherry Oct 23, 2024
01a2c4f
v
bcherry Oct 23, 2024
52a51a3
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 24, 2024
fba2a7a
generated protobuf
github-actions[bot] Oct 24, 2024
24788db
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 25, 2024
aeed674
error
bcherry Oct 25, 2024
63b42bc
remove
bcherry Oct 25, 2024
4df795e
readme
bcherry Oct 25, 2024
b8bd0b3
fmt
bcherry Oct 25, 2024
5b5ff03
print
bcherry Oct 25, 2024
e6196eb
Decorator variant
bcherry Oct 25, 2024
cdd942c
fmt
bcherry Oct 25, 2024
bb40151
logger
bcherry Oct 28, 2024
0d2a2d1
allow sync
bcherry Oct 28, 2024
99c7ea7
remove
bcherry Oct 28, 2024
3f04040
store task
bcherry Oct 28, 2024
98ecaa9
typoi
bcherry Oct 28, 2024
a524032
Fixes
bcherry Oct 28, 2024
b518e68
kwargs
bcherry Oct 28, 2024
47d46cb
ruff
bcherry Oct 28, 2024
4a62dec
float
bcherry Oct 28, 2024
69ef4f9
none
bcherry Oct 28, 2024
c4a4b50
dec types
bcherry Oct 28, 2024
a6a6ce6
fixes
bcherry Oct 28, 2024
fe07bd4
version
bcherry Oct 28, 2024
8a42147
params
bcherry Oct 28, 2024
b0f575a
data
bcherry Oct 28, 2024
efd7784
cast
bcherry Oct 28, 2024
460a292
fmt
bcherry Oct 28, 2024
b13435b
ex
bcherry Oct 28, 2024
a8b0dc7
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 30, 2024
8481411
clean up
bcherry Oct 30, 2024
2c3ba73
docs
bcherry Oct 30, 2024
6bf4749
decorator
bcherry Oct 30, 2024
c56be2f
fmt
bcherry Oct 30, 2024
c393774
readme
bcherry Oct 30, 2024
5d4b964
return
bcherry Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,51 @@ def on_message_received(msg: rtc.ChatMessage):
await chat.send_message("hello world")
```


### RPC

Perform your own predefined method calls from one participant to another.

This feature is especially powerful when used with [Agents](https://docs.livekit.io/agents), for instance to forward LLM function calls to your client application.

#### Registering an RPC method

The participant who implements the method and will receive its calls must first register support:

```python
@room.local_participant.rpc_method("greet")
async def handle_greet(request_id: str, caller_identity: str, payload: str, response_timeout: float):
print(f"Received greeting from {caller_identity}: {payload}")
return f"Hello, {caller_identity}!"
```

In addition to the payload, your handler will also receive `response_timeout`, which informs you the maximum time available to return a response. If you are unable to respond in time, the call will result in an error on the caller's side.

#### Performing an RPC request

The caller may then initiate an RPC call like so:

```python
try:
response = await room.local_participant.perform_rpc(
destination_identity='recipient-identity',
method='greet',
payload='Hello from RPC!'
)
print(f"RPC response: {response}")
except Exception as e:
print(f"RPC call failed: {e}")
```

You may find it useful to adjust the `response_timeout` parameter, which indicates the amount of time you will wait for a response. We recommend keeping this value as low as possible while still satisfying the constraints of your application.

#### Errors

LiveKit is a dynamic realtime environment and calls can fail for various reasons.

You may throw errors of the type `RpcError` with a string `message` in an RPC method handler and they will be received on the caller's side with the message intact. Other errors will not be transmitted and will instead arrive to the caller as `1500` ("Application Error"). Other built-in errors are detailed in `RpcError`.


## Examples

- [Facelandmark](https://github.com/livekit/python-sdks/tree/main/examples/face_landmark): Use mediapipe to detect face landmarks (eyes, nose ...)
Expand Down
287 changes: 287 additions & 0 deletions examples/rpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
from livekit import rtc, api
import os
import json
import asyncio
from dotenv import load_dotenv
from livekit.rtc.rpc import RpcInvocationData

load_dotenv(dotenv_path=".env.local", override=False)
LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY")
LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET")
LIVEKIT_URL = os.getenv("LIVEKIT_URL")
if not LIVEKIT_API_KEY or not LIVEKIT_API_SECRET or not LIVEKIT_URL:
raise ValueError(
"Missing required environment variables. Please check your .env.local file."
)


async def main():
rooms = [] # Keep track of all rooms for cleanup
try:
room_name = f"rpc-test-{os.urandom(4).hex()}"
print(f"Connecting participants to room: {room_name}")

callers_room, greeters_room, math_genius_room = await asyncio.gather(
connect_participant("caller", room_name),
connect_participant("greeter", room_name),
connect_participant("math-genius", room_name),
)
rooms = [callers_room, greeters_room, math_genius_room]

register_receiver_methods(greeters_room, math_genius_room)

try:
print("\n\nRunning greeting example...")
await asyncio.gather(perform_greeting(callers_room))
except Exception as error:
print("Error:", error)

try:
print("\n\nRunning error handling example...")
await perform_divide(callers_room)
except Exception as error:
print("Error:", error)

try:
print("\n\nRunning math example...")
await perform_square_root(callers_room)
await asyncio.sleep(2)
await perform_quantum_hypergeometric_series(callers_room)
except Exception as error:
print("Error:", error)

try:
print("\n\nRunning long calculation with timeout...")
await asyncio.create_task(perform_long_calculation(callers_room))
except Exception as error:
print("Error:", error)

try:
print("\n\nRunning long calculation with disconnect...")
# Start the long calculation
long_calc_task = asyncio.create_task(perform_long_calculation(callers_room))
# Wait a bit then disconnect the math genius
await asyncio.sleep(5)
print("\nDisconnecting math genius early...")
await math_genius_room.disconnect()
# Wait for the calculation to fail
await long_calc_task
except Exception as error:
print("Error:", error)

print("\n\nParticipants done, disconnecting remaining participants...")
await callers_room.disconnect()
await greeters_room.disconnect()

print("Participants disconnected. Example completed.")

except KeyboardInterrupt:
print("\nReceived interrupt signal, cleaning up...")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
# Clean up all rooms
print("Disconnecting all participants...")
await asyncio.gather(
*(room.disconnect() for room in rooms), return_exceptions=True
)
print("Cleanup complete")


def register_receiver_methods(greeters_room: rtc.Room, math_genius_room: rtc.Room):
@greeters_room.local_participant.rpc_method("arrival")
async def arrival_method(
data: RpcInvocationData,
):
print(f'[Greeter] Oh {data.caller_identity} arrived and said "{data.payload}"')
await asyncio.sleep(2)
return "Welcome and have a wonderful day!"

@math_genius_room.local_participant.rpc_method("square-root")
async def square_root_method(
data: RpcInvocationData,
):
json_data = json.loads(data.payload)
number = json_data["number"]
print(
f"[Math Genius] I guess {data.caller_identity} wants the square root of {number}. I've only got {data.response_timeout} seconds to respond but I think I can pull it off."
)

print("[Math Genius] *doing math*…")
await asyncio.sleep(2)

result = number**0.5
print(f"[Math Genius] Aha! It's {result}")
return json.dumps({"result": result})

@math_genius_room.local_participant.rpc_method("divide")
async def divide_method(
data: RpcInvocationData,
):
json_data = json.loads(data.payload)
dividend = json_data["dividend"]
divisor = json_data["divisor"]
print(
f"[Math Genius] {data.caller_identity} wants to divide {dividend} by {divisor}."
)

result = dividend / divisor
return json.dumps({"result": result})

@math_genius_room.local_participant.rpc_method("long-calculation")
async def long_calculation_method(
data: RpcInvocationData,
):
print(
f"[Math Genius] Starting a very long calculation for {data.caller_identity}"
)
print(
f"[Math Genius] This will take 30 seconds even though you're only giving me {data.response_timeout} seconds"
)
await asyncio.sleep(30)
return json.dumps({"result": "Calculation complete!"})


async def perform_greeting(room: rtc.Room):
print("[Caller] Letting the greeter know that I've arrived")
try:
response = await room.local_participant.perform_rpc(
destination_identity="greeter", method="arrival", payload="Hello"
)
print(f'[Caller] That\'s nice, the greeter said: "{response}"')
except Exception as error:
print(f"[Caller] RPC call failed: {error}")
raise


async def perform_square_root(room: rtc.Room):
print("[Caller] What's the square root of 16?")
try:
response = await room.local_participant.perform_rpc(
destination_identity="math-genius",
method="square-root",
payload=json.dumps({"number": 16}),
)
parsed_response = json.loads(response)
print(f"[Caller] Nice, the answer was {parsed_response['result']}")
except Exception as error:
print(f"[Caller] RPC call failed: {error}")
raise


async def perform_quantum_hypergeometric_series(room: rtc.Room):
print("[Caller] What's the quantum hypergeometric series of 42?")
try:
response = await room.local_participant.perform_rpc(
destination_identity="math-genius",
method="quantum-hypergeometric-series",
payload=json.dumps({"number": 42}),
)
parsed_response = json.loads(response)
print(f"[Caller] genius says {parsed_response['result']}!")
except rtc.RpcError as error:
if error.code == rtc.RpcError.ErrorCode.UNSUPPORTED_METHOD:
print("[Caller] Aww looks like the genius doesn't know that one.")
return
print("[Caller] Unexpected error:", error)
raise
except Exception as error:
print("[Caller] Unexpected error:", error)
raise


async def perform_divide(room: rtc.Room):
print("[Caller] Let's divide 10 by 0.")
try:
response = await room.local_participant.perform_rpc(
destination_identity="math-genius",
method="divide",
payload=json.dumps({"dividend": 10, "divisor": 0}),
)
parsed_response = json.loads(response)
print(f"[Caller] The result is {parsed_response['result']}")
except rtc.RpcError as error:
if error.code == rtc.RpcError.ErrorCode.APPLICATION_ERROR:
print(
"[Caller] Aww something went wrong with that one, lets try something else."
)
else:
print(f"[Caller] RPC call failed with unexpected RpcError: {error}")
except Exception as error:
print(f"[Caller] RPC call failed with unexpected error: {error}")


async def perform_long_calculation(room: rtc.Room):
print("[Caller] Giving the math genius 10s to complete a long calculation")
try:
response = await room.local_participant.perform_rpc(
destination_identity="math-genius",
method="long-calculation",
payload=json.dumps({}),
response_timeout=10,
)
parsed_response = json.loads(response)
print(f"[Caller] Result: {parsed_response['result']}")
except rtc.RpcError as error:
if error.code == rtc.RpcError.ErrorCode.RESPONSE_TIMEOUT:
print("[Caller] Math genius took too long to respond")
elif error.code == rtc.RpcError.ErrorCode.RECIPIENT_DISCONNECTED:
print("[Caller] Math genius disconnected before response was received")
else:
print(f"[Caller] Unexpected RPC error: {error}")
except Exception as error:
print(f"[Caller] Unexpected error: {error}")


def create_token(identity: str, room_name: str):
token = (
api.AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET)
.with_identity(identity)
.with_grants(
api.VideoGrants(
room=room_name,
room_join=True,
can_publish=True,
can_subscribe=True,
)
)
)
return token.to_jwt()


async def connect_participant(identity: str, room_name: str) -> rtc.Room:
room = rtc.Room()
token = create_token(identity, room_name)

def on_disconnected(reason: str):
print(f"[{identity}] Disconnected from room: {reason}")

room.on("disconnected", on_disconnected)

await room.connect(LIVEKIT_URL, token)

async def wait_for_participants():
if room.remote_participants:
return
participant_connected = asyncio.Event()

def _on_participant_connected(participant: rtc.RemoteParticipant):
room.off("participant_connected", _on_participant_connected)
participant_connected.set()

room.on("participant_connected", _on_participant_connected)
await participant_connected.wait()

try:
await asyncio.wait_for(wait_for_participants(), timeout=5.0)
except asyncio.TimeoutError:
raise TimeoutError("Timed out waiting for participants")

return room


if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nProgram terminated by user")
3 changes: 3 additions & 0 deletions livekit-rtc/livekit/rtc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
from .video_stream import VideoFrameEvent, VideoStream
from .audio_resampler import AudioResampler, AudioResamplerQuality
from .utils import combine_audio_frames
from .rpc import RpcError, RpcInvocationData

__all__ = [
"ConnectionQuality",
Expand Down Expand Up @@ -132,6 +133,8 @@
"ChatMessage",
"AudioResampler",
"AudioResamplerQuality",
"RpcError",
"RpcInvocationData",
"EventEmitter",
"combine_audio_frames",
"__version__",
Expand Down
Loading
Loading