From 4063504bd9282ca434de06e792402a73b647f0b1 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Sun, 14 Jul 2024 08:57:29 -1000 Subject: [PATCH] Avoid crashing the FfiQueue when subscriber is not cleaned up (#219) --- examples/multiple_connections.py | 66 ++++++++++++++++++++++++++ livekit-rtc/livekit/rtc/_ffi_client.py | 7 ++- 2 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 examples/multiple_connections.py diff --git a/examples/multiple_connections.py b/examples/multiple_connections.py new file mode 100644 index 00000000..3bf01d7e --- /dev/null +++ b/examples/multiple_connections.py @@ -0,0 +1,66 @@ +import os +import asyncio +from livekit import api, rtc + +# This example demonstrates running multiple connections sequentially in the same thread. +# This is useful when interoperating with a synchronous framework like Django or Flask +# where you would connect to a LiveKit room as part of a request handler. + +# LIVEKIT_URL needs to be set +# also, set either LIVEKIT_TOKEN, or API_KEY and API_SECRET + + +async def main(): + url = os.environ["LIVEKIT_URL"] + token = os.getenv("LIVEKIT_TOKEN") + room = rtc.Room() + if not token: + token = ( + api.AccessToken() + .with_identity("python-bot") + .with_name("Python Bot") + .with_grants( + api.VideoGrants( + room_join=True, + room="my-room", + ) + ) + .to_jwt() + ) + + track_sub = asyncio.Event() + + @room.on("track_subscribed") + def on_track_subscribed( + track: rtc.Track, + publication: rtc.RemoteTrackPublication, + participant: rtc.RemoteParticipant, + ): + if track.kind == rtc.TrackKind.KIND_AUDIO: + stream = rtc.AudioStream(track) # the error comes from this line + track_sub.set() + # any created streams would need to be closed explicitly to avoid leaks + asyncio.get_event_loop().create_task(stream.aclose()) + print("subscribed to audio track") + + await room.connect(url, token) + print(f"connected to room: {room.name}") + await track_sub.wait() + await room.disconnect() + print("disconnected from room") + + +def ensure_event_loop(): + try: + return asyncio.get_event_loop() + except RuntimeError: + # Create a new event loop if none exists (this can happen in some contexts like certain threads) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop + + +if __name__ == "__main__": + asyncio.run(main()) + asyncio.run(main()) + print("successfully ran multiple connections") diff --git a/livekit-rtc/livekit/rtc/_ffi_client.py b/livekit-rtc/livekit/rtc/_ffi_client.py index 9412bfba..7aaba306 100644 --- a/livekit-rtc/livekit/rtc/_ffi_client.py +++ b/livekit-rtc/livekit/rtc/_ffi_client.py @@ -106,7 +106,12 @@ def __init__(self) -> None: def put(self, item: T) -> None: with self._lock: for queue, loop in self._subscribers: - loop.call_soon_threadsafe(queue.put_nowait, item) + try: + loop.call_soon_threadsafe(queue.put_nowait, item) + except Exception as e: + # this could happen if user closes the runloop without unsubscribing first + # it's not good when it does occur, but we should not fail the entire runloop + logger.error("error putting to queue: %s", e) def subscribe(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> Queue[T]: with self._lock: