diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index 2221eb41..b8aaf73b 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import contextlib import ctypes import logging from dataclasses import dataclass, field @@ -218,22 +219,31 @@ async def connect( async def disconnect(self) -> None: if not self.isconnected(): - return + if self._task is not None: + with contextlib.suppress(asyncio.CancelledError): + await self._task + + if self._ffi_queue is not None: + FfiClient.instance.queue.unsubscribe(self._ffi_queue) - req = proto_ffi.FfiRequest() - req.disconnect.room_handle = self._ffi_handle.handle # type: ignore + return - queue = FfiClient.instance.queue.subscribe() try: - resp = FfiClient.instance.request(req) - await queue.wait_for( - lambda e: e.disconnect.async_id == resp.disconnect.async_id - ) - finally: - FfiClient.instance.queue.unsubscribe(queue) + req = proto_ffi.FfiRequest() + req.disconnect.room_handle = self._ffi_handle.handle # type: ignore + + queue = FfiClient.instance.queue.subscribe() + try: + resp = FfiClient.instance.request(req) + await queue.wait_for( + lambda e: e.disconnect.async_id == resp.disconnect.async_id + ) + finally: + FfiClient.instance.queue.unsubscribe(queue) - await self._task - FfiClient.instance.queue.unsubscribe(self._ffi_queue) + await self._task + finally: + FfiClient.instance.queue.unsubscribe(self._ffi_queue) async def _listen_task(self) -> None: # listen to incoming room events