Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC implementation via FFI #283

Merged
merged 59 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
3c8550a
deps
bcherry Oct 8, 2024
f027914
rpcerror
bcherry Oct 8, 2024
a0082c5
participant
bcherry Oct 8, 2024
a0e2bef
p
bcherry Oct 8, 2024
4511ebe
generated protobuf
github-actions[bot] Oct 8, 2024
7e01e90
Close to working
bcherry Oct 8, 2024
cec803a
Merge remote-tracking branch 'origin/bcherry/rpc' into bcherry/rpc
bcherry Oct 8, 2024
1f7296c
generated protobuf
github-actions[bot] Oct 8, 2024
6d48330
r
bcherry Oct 8, 2024
f2c50bb
wip
bcherry Oct 8, 2024
d6f8db8
Fixes
bcherry Oct 8, 2024
01b9ede
fmt
bcherry Oct 8, 2024
8923184
format
bcherry Oct 8, 2024
e22b112
fixes
bcherry Oct 8, 2024
575d79c
add divide by zero example
bcherry Oct 8, 2024
775b199
byz
bcherry Oct 8, 2024
f02e98b
ruff
bcherry Oct 8, 2024
0c7fc92
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 15, 2024
8c50014
use identity
bcherry Oct 15, 2024
e562202
sm
bcherry Oct 23, 2024
b263701
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 23, 2024
cb76a60
p
bcherry Oct 23, 2024
0afd947
fixes
bcherry Oct 23, 2024
01a2c4f
v
bcherry Oct 23, 2024
52a51a3
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 24, 2024
fba2a7a
generated protobuf
github-actions[bot] Oct 24, 2024
24788db
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 25, 2024
aeed674
error
bcherry Oct 25, 2024
63b42bc
remove
bcherry Oct 25, 2024
4df795e
readme
bcherry Oct 25, 2024
b8bd0b3
fmt
bcherry Oct 25, 2024
5b5ff03
print
bcherry Oct 25, 2024
e6196eb
Decorator variant
bcherry Oct 25, 2024
cdd942c
fmt
bcherry Oct 25, 2024
bb40151
logger
bcherry Oct 28, 2024
0d2a2d1
allow sync
bcherry Oct 28, 2024
99c7ea7
remove
bcherry Oct 28, 2024
3f04040
store task
bcherry Oct 28, 2024
98ecaa9
typoi
bcherry Oct 28, 2024
a524032
Fixes
bcherry Oct 28, 2024
b518e68
kwargs
bcherry Oct 28, 2024
47d46cb
ruff
bcherry Oct 28, 2024
4a62dec
float
bcherry Oct 28, 2024
69ef4f9
none
bcherry Oct 28, 2024
c4a4b50
dec types
bcherry Oct 28, 2024
a6a6ce6
fixes
bcherry Oct 28, 2024
fe07bd4
version
bcherry Oct 28, 2024
8a42147
params
bcherry Oct 28, 2024
b0f575a
data
bcherry Oct 28, 2024
efd7784
cast
bcherry Oct 28, 2024
460a292
fmt
bcherry Oct 28, 2024
b13435b
ex
bcherry Oct 28, 2024
a8b0dc7
Merge remote-tracking branch 'origin/main' into bcherry/rpc
bcherry Oct 30, 2024
8481411
clean up
bcherry Oct 30, 2024
2c3ba73
docs
bcherry Oct 30, 2024
6bf4749
decorator
bcherry Oct 30, 2024
c56be2f
fmt
bcherry Oct 30, 2024
c393774
readme
bcherry Oct 30, 2024
5d4b964
return
bcherry Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 227 additions & 0 deletions examples/rpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
from livekit import rtc, api
import os
import json
import asyncio
from dotenv import load_dotenv

load_dotenv(dotenv_path=".env.local", override=False)
LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY")
LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET")
LIVEKIT_URL = os.getenv("LIVEKIT_URL")
if not LIVEKIT_API_KEY or not LIVEKIT_API_SECRET or not LIVEKIT_URL:
raise ValueError(
"Missing required environment variables. Please check your .env.local file."
)


async def main():
room_name = f"rpc-test-{os.urandom(4).hex()}"

print(f"Connecting participants to room: {room_name}")

callers_room, greeters_room, math_genius_room = await asyncio.gather(
connect_participant("caller", room_name),
connect_participant("greeter", room_name),
connect_participant("math-genius", room_name),
)

# Register all methods for the receiving participant
await register_receiver_methods(greeters_room, math_genius_room)

try:
print("\n\nRunning greeting example...")
await asyncio.gather(perform_greeting(callers_room))
except Exception as error:
print("Error:", error)

try:
print("\n\nRunning error handling example...")
await perform_divide(callers_room)
except Exception as error:
print("Error:", error)

try:
print("\n\nRunning math example...")
await perform_square_root(callers_room)
await asyncio.sleep(2)
await perform_quantum_hypergeometric_series(callers_room)
except Exception as error:
print("Error:", error)

print("\n\nParticipants done, disconnecting...")
await callers_room.disconnect()
await greeters_room.disconnect()
await math_genius_room.disconnect()

print("Participants disconnected. Example completed.")


async def register_receiver_methods(
greeters_room: rtc.Room, math_genius_room: rtc.Room
):
async def arrival_method(
request_id: str,
caller: rtc.RemoteParticipant,
payload: str,
response_timeout_ms: int,
):
print(f'[Greeter] Oh {caller.identity} arrived and said "{payload}"')
await asyncio.sleep(2)
return "Welcome and have a wonderful day!"

async def square_root_method(
request_id: str,
caller: rtc.RemoteParticipant,
payload: str,
response_timeout_ms: int,
):
json_data = json.loads(payload)
number = json_data["number"]
print(
f"[Math Genius] I guess {caller.identity} wants the square root of {number}. I've only got {response_timeout_ms / 1000} seconds to respond but I think I can pull it off."
)

print("[Math Genius] *doing math*…")
await asyncio.sleep(2)

result = number**0.5
print(f"[Math Genius] Aha! It's {result}")
return json.dumps({"result": result})

async def divide_method(
request_id: str,
caller: rtc.RemoteParticipant,
payload: str,
response_timeout_ms: int,
):
json_data = json.loads(payload)
dividend = json_data["dividend"]
divisor = json_data["divisor"]
print(
f"[Math Genius] {caller.identity} wants to divide {dividend} by {divisor}."
)

result = dividend / divisor
return json.dumps({"result": result})

await greeters_room.local_participant.register_rpc_method("arrival", arrival_method)
await math_genius_room.local_participant.register_rpc_method(
"square-root", square_root_method
)
await math_genius_room.local_participant.register_rpc_method(
"divide", divide_method
)
bcherry marked this conversation as resolved.
Show resolved Hide resolved


async def perform_greeting(room: rtc.Room):
print("[Caller] Letting the greeter know that I've arrived")
try:
response = await room.local_participant.perform_rpc(
"greeter", "arrival", "Hello"
)
print(f'[Caller] That\'s nice, the greeter said: "{response}"')
except Exception as error:
print(f"[Caller] RPC call failed: {error}")
raise


async def perform_square_root(room: rtc.Room):
print("[Caller] What's the square root of 16?")
try:
response = await room.local_participant.perform_rpc(
"math-genius", "square-root", json.dumps({"number": 16})
)
parsed_response = json.loads(response)
print(f"[Caller] Nice, the answer was {parsed_response['result']}")
except Exception as error:
print(f"[Caller] RPC call failed: {error}")
raise


async def perform_quantum_hypergeometric_series(room: rtc.Room):
print("[Caller] What's the quantum hypergeometric series of 42?")
try:
response = await room.local_participant.perform_rpc(
"math-genius", "quantum-hypergeometric-series", json.dumps({"number": 42})
)
parsed_response = json.loads(response)
print(f"[Caller] genius says {parsed_response['result']}!")
except rtc.RpcError as error:
if error.code == rtc.RpcError.ErrorCode.UNSUPPORTED_METHOD:
print("[Caller] Aww looks like the genius doesn't know that one.")
return
print("[Caller] Unexpected error:", error)
raise
except Exception as error:
print("[Caller] Unexpected error:", error)
raise


async def perform_divide(room: rtc.Room):
print("[Caller] Let's divide 10 by 0.")
try:
response = await room.local_participant.perform_rpc(
"math-genius", "divide", json.dumps({"dividend": 10, "divisor": 0})
)
parsed_response = json.loads(response)
print(f"[Caller] The result is {parsed_response['result']}")
except rtc.RpcError as error:
if error.code == rtc.RpcError.ErrorCode.APPLICATION_ERROR:
print(
"[Caller] Aww something went wrong with that one, lets try something else."
)
else:
print(f"[Caller] RPC call failed with unexpected RpcError: {error}")
except Exception as error:
print(f"[Caller] RPC call failed with unexpected error: {error}")


def create_token(identity: str, room_name: str):
token = (
api.AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET)
.with_identity(identity)
.with_grants(
api.VideoGrants(
room=room_name,
room_join=True,
can_publish=True,
can_subscribe=True,
)
)
)
return token.to_jwt()


async def connect_participant(identity: str, room_name: str) -> rtc.Room:
room = rtc.Room()
token = create_token(identity, room_name)

def on_disconnected(reason: str):
print(f"[{identity}] Disconnected from room: {reason}")

room.on("disconnected", on_disconnected)

await room.connect(LIVEKIT_URL, token)

async def wait_for_participants():
if room.remote_participants:
return
participant_connected = asyncio.Event()

def _on_participant_connected(participant: rtc.RemoteParticipant):
room.off("participant_connected", _on_participant_connected)
participant_connected.set()

room.on("participant_connected", _on_participant_connected)
await participant_connected.wait()

try:
await asyncio.wait_for(wait_for_participants(), timeout=5.0)
except asyncio.TimeoutError:
raise TimeoutError("Timed out waiting for participants")

return room


if __name__ == "__main__":
asyncio.run(main())
5 changes: 3 additions & 2 deletions livekit-rtc/generate_proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ protoc \
$FFI_PROTOCOL/track.proto \
$FFI_PROTOCOL/video_frame.proto \
$FFI_PROTOCOL/e2ee.proto \
$FFI_PROTOCOL/stats.proto
$FFI_PROTOCOL/stats.proto \
$FFI_PROTOCOL/rpc.proto

touch -a "$FFI_OUT_PYTHON/__init__.py"

for f in "$FFI_OUT_PYTHON"/*.py "$FFI_OUT_PYTHON"/*.pyi; do
perl -i -pe 's|^(import (audio_frame_pb2\|ffi_pb2\|handle_pb2\|participant_pb2\|room_pb2\|track_pb2\|video_frame_pb2\|e2ee_pb2\|stats_pb2))|from . $1|g' "$f"
perl -i -pe 's|^(import (audio_frame_pb2\|ffi_pb2\|handle_pb2\|participant_pb2\|room_pb2\|track_pb2\|video_frame_pb2\|e2ee_pb2\|stats_pb2\|rpc_pb2))|from . $1|g' "$f"
done
2 changes: 2 additions & 0 deletions livekit-rtc/livekit/rtc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
from .video_stream import VideoFrameEvent, VideoStream
from .audio_resampler import AudioResampler, AudioResamplerQuality
from .utils import combine_audio_frames
from .rpc import RpcError

__all__ = [
"ConnectionQuality",
Expand Down Expand Up @@ -131,6 +132,7 @@
"ChatMessage",
"AudioResampler",
"AudioResamplerQuality",
"RpcError",
"combine_audio_frames",
"__version__",
]
16 changes: 13 additions & 3 deletions livekit-rtc/livekit/rtc/_proto/audio_frame_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 13 additions & 3 deletions livekit-rtc/livekit/rtc/_proto/e2ee_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading