From 76d3fd9266da29320ae9e4b614643b6b2f9d8f21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?The=CC=81o=20Monnom?= Date: Sun, 5 May 2024 20:26:11 +0200 Subject: [PATCH] Update room.py --- livekit-rtc/livekit/rtc/room.py | 34 +++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index 2221eb41..b8aaf73b 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import contextlib import ctypes import logging from dataclasses import dataclass, field @@ -218,22 +219,31 @@ async def connect( async def disconnect(self) -> None: if not self.isconnected(): - return + if self._task is not None: + with contextlib.suppress(asyncio.CancelledError): + await self._task + + if self._ffi_queue is not None: + FfiClient.instance.queue.unsubscribe(self._ffi_queue) - req = proto_ffi.FfiRequest() - req.disconnect.room_handle = self._ffi_handle.handle # type: ignore + return - queue = FfiClient.instance.queue.subscribe() try: - resp = FfiClient.instance.request(req) - await queue.wait_for( - lambda e: e.disconnect.async_id == resp.disconnect.async_id - ) - finally: - FfiClient.instance.queue.unsubscribe(queue) + req = proto_ffi.FfiRequest() + req.disconnect.room_handle = self._ffi_handle.handle # type: ignore + + queue = FfiClient.instance.queue.subscribe() + try: + resp = FfiClient.instance.request(req) + await queue.wait_for( + lambda e: e.disconnect.async_id == resp.disconnect.async_id + ) + finally: + FfiClient.instance.queue.unsubscribe(queue) - await self._task - FfiClient.instance.queue.unsubscribe(self._ffi_queue) + await self._task + finally: + FfiClient.instance.queue.unsubscribe(self._ffi_queue) async def _listen_task(self) -> None: # listen to incoming room events