From 585a6c144a1c79180fb83929b210d2dcb90f3964 Mon Sep 17 00:00:00 2001 From: Neil Dwyer Date: Thu, 14 Dec 2023 16:57:48 -0800 Subject: [PATCH] Instantiate FfiClient Lazily (#125) --- livekit-rtc/livekit/rtc/_ffi_client.py | 15 ++++++++---- livekit-rtc/livekit/rtc/_utils.py | 8 +++++++ livekit-rtc/livekit/rtc/audio_frame.py | 6 ++--- livekit-rtc/livekit/rtc/audio_source.py | 10 ++++---- livekit-rtc/livekit/rtc/audio_stream.py | 10 ++++---- livekit-rtc/livekit/rtc/e2ee.py | 22 +++++++++--------- livekit-rtc/livekit/rtc/participant.py | 24 ++++++++++---------- livekit-rtc/livekit/rtc/room.py | 22 +++++++++--------- livekit-rtc/livekit/rtc/track.py | 12 +++++----- livekit-rtc/livekit/rtc/track_publication.py | 4 ++-- livekit-rtc/livekit/rtc/video_frame.py | 10 ++++---- livekit-rtc/livekit/rtc/video_source.py | 6 ++--- livekit-rtc/livekit/rtc/video_stream.py | 10 ++++---- 13 files changed, 86 insertions(+), 73 deletions(-) diff --git a/livekit-rtc/livekit/rtc/_ffi_client.py b/livekit-rtc/livekit/rtc/_ffi_client.py index 75aedff3..397df55f 100644 --- a/livekit-rtc/livekit/rtc/_ffi_client.py +++ b/livekit-rtc/livekit/rtc/_ffi_client.py @@ -23,7 +23,7 @@ import pkg_resources from ._proto import ffi_pb2 as proto_ffi -from ._utils import Queue +from ._utils import Queue, classproperty logger = logging.getLogger("livekit") @@ -142,7 +142,7 @@ def ffi_event_callback( return # no need to queue the logs - ffi_client.queue.put(event) + FfiClient.instance.queue.put(event) def to_python_level(level: proto_ffi.LogLevel.ValueType) -> int: @@ -161,6 +161,14 @@ def to_python_level(level: proto_ffi.LogLevel.ValueType) -> int: class FfiClient: + _instance: Optional["FfiClient"] = None + + @classproperty + def instance(self): + if self._instance is None: + self._instance = FfiClient() + return self._instance + def __init__(self) -> None: self._lock = threading.RLock() self._queue = FfiQueue[proto_ffi.FfiEvent]() @@ -188,6 +196,3 @@ def request(self, req: proto_ffi.FfiRequest) -> proto_ffi.FfiResponse: FfiHandle(handle) return resp - - -ffi_client = FfiClient() diff --git a/livekit-rtc/livekit/rtc/_utils.py b/livekit-rtc/livekit/rtc/_utils.py index 83c99050..0feb9138 100644 --- a/livekit-rtc/livekit/rtc/_utils.py +++ b/livekit-rtc/livekit/rtc/_utils.py @@ -7,6 +7,14 @@ logger = logging.getLogger("livekit") +class classproperty(object): + def __init__(self, f): + self.f = classmethod(f) + + def __get__(self, *a): + return self.f.__get__(*a)() + + def task_done_logger(task: asyncio.Task) -> None: if task.cancelled(): logger.info("task cancelled: %s", task) diff --git a/livekit-rtc/livekit/rtc/audio_frame.py b/livekit-rtc/livekit/rtc/audio_frame.py index ca53e44d..956bb5e0 100644 --- a/livekit-rtc/livekit/rtc/audio_frame.py +++ b/livekit-rtc/livekit/rtc/audio_frame.py @@ -13,7 +13,7 @@ # limitations under the License. import ctypes -from ._ffi_client import FfiHandle, ffi_client +from ._ffi_client import FfiHandle, FfiClient from ._proto import audio_frame_pb2 as proto_audio from ._proto import ffi_pb2 as proto_ffi from ._utils import get_address @@ -65,7 +65,7 @@ def remix_and_resample(self, sample_rate: int, num_channels: int) -> "AudioFrame req = proto_ffi.FfiRequest() req.new_audio_resampler.CopyFrom(proto_audio.NewAudioResamplerRequest()) - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) resampler_handle = FfiHandle(resp.new_audio_resampler.resampler.handle.id) resample_req = proto_ffi.FfiRequest() @@ -74,7 +74,7 @@ def remix_and_resample(self, sample_rate: int, num_channels: int) -> "AudioFrame resample_req.remix_and_resample.sample_rate = sample_rate resample_req.remix_and_resample.num_channels = num_channels - resp = ffi_client.request(resample_req) + resp = FfiClient.instance.request(resample_req) return AudioFrame._from_owned_info(resp.remix_and_resample.buffer) def _proto_info(self) -> proto_audio.AudioFrameBufferInfo: diff --git a/livekit-rtc/livekit/rtc/audio_source.py b/livekit-rtc/livekit/rtc/audio_source.py index bb1ef865..7af7e012 100644 --- a/livekit-rtc/livekit/rtc/audio_source.py +++ b/livekit-rtc/livekit/rtc/audio_source.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._ffi_client import FfiHandle, ffi_client +from ._ffi_client import FfiHandle, FfiClient from ._proto import audio_frame_pb2 as proto_audio_frame from ._proto import ffi_pb2 as proto_ffi from .audio_frame import AudioFrame @@ -27,7 +27,7 @@ def __init__(self, sample_rate: int, num_channels: int) -> None: req.new_audio_source.sample_rate = sample_rate req.new_audio_source.num_channels = num_channels - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) self._info = resp.new_audio_source.source self._ffi_handle = FfiHandle(self._info.handle.id) @@ -37,15 +37,15 @@ async def capture_frame(self, frame: AudioFrame) -> None: req.capture_audio_frame.source_handle = self._ffi_handle.handle req.capture_audio_frame.buffer.CopyFrom(frame._proto_info()) - queue = ffi_client.queue.subscribe() + queue = FfiClient.instance.queue.subscribe() try: - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) cb = await queue.wait_for( lambda e: e.capture_audio_frame.async_id == resp.capture_audio_frame.async_id ) finally: - ffi_client.queue.unsubscribe(queue) + FfiClient.instance.queue.unsubscribe(queue) if cb.capture_audio_frame.error: raise Exception(cb.capture_audio_frame.error) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index f206d852..c8aa0600 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -15,7 +15,7 @@ import asyncio from typing import Optional -from ._ffi_client import FfiHandle, ffi_client +from ._ffi_client import FfiHandle, FfiClient from ._proto import audio_frame_pb2 as proto_audio_frame from ._proto import ffi_pb2 as proto_ffi from ._utils import RingQueue, task_done_logger @@ -32,14 +32,14 @@ def __init__( ) -> None: self._track = track self._loop = loop or asyncio.get_event_loop() - self._ffi_queue = ffi_client.queue.subscribe(self._loop) + self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop) self._queue: RingQueue[AudioFrame] = RingQueue(capacity) req = proto_ffi.FfiRequest() new_audio_stream = req.new_audio_stream new_audio_stream.track_handle = track._ffi_handle.handle new_audio_stream.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) stream_info = resp.new_audio_stream.stream self._ffi_handle = FfiHandle(stream_info.handle.id) @@ -49,7 +49,7 @@ def __init__( self._task.add_done_callback(task_done_logger) def __del__(self) -> None: - ffi_client.queue.unsubscribe(self._ffi_queue) + FfiClient.instance.queue.unsubscribe(self._ffi_queue) async def _run(self): while True: @@ -63,7 +63,7 @@ async def _run(self): elif audio_event.HasField("eos"): break - ffi_client.queue.unsubscribe(self._ffi_queue) + FfiClient.instance.queue.unsubscribe(self._ffi_queue) async def aclose(self): self._ffi_handle.dispose() diff --git a/livekit-rtc/livekit/rtc/e2ee.py b/livekit-rtc/livekit/rtc/e2ee.py index 2db86ef7..3b296bd0 100644 --- a/livekit-rtc/livekit/rtc/e2ee.py +++ b/livekit-rtc/livekit/rtc/e2ee.py @@ -15,7 +15,7 @@ from dataclasses import dataclass, field from typing import List, Optional -from ._ffi_client import ffi_client +from ._ffi_client import FfiClient from ._proto import e2ee_pb2 as proto_e2ee from ._proto import ffi_pb2 as proto_ffi @@ -52,13 +52,13 @@ def set_shared_key(self, key: bytes, key_index: int) -> None: req.e2ee.room_handle = self._room_handle req.e2ee.set_shared_key.key_index = key_index req.e2ee.set_shared_key.shared_key = key - ffi_client.request(req) + FfiClient.instance.request(req) def export_shared_key(self, key_index: int) -> bytes: req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.get_shared_key.key_index = key_index - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) key = resp.e2ee.get_shared_key.key return key @@ -67,7 +67,7 @@ def ratchet_shared_key(self, key_index: int) -> bytes: req.e2ee.room_handle = self._room_handle req.e2ee.ratchet_shared_key.key_index = key_index - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) new_key = resp.e2ee.ratchet_shared_key.new_key return new_key @@ -80,14 +80,14 @@ def set_key(self, participant_identity: str, key: bytes, key_index: int) -> None req.e2ee.set_key.key = key self.key_index = key_index - ffi_client.request(req) + FfiClient.instance.request(req) def export_key(self, participant_identity: str, key_index: int) -> bytes: req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.get_key.participant_identity = participant_identity req.e2ee.get_key.key_index = key_index - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) key = resp.e2ee.get_key.key return key @@ -97,7 +97,7 @@ def ratchet_key(self, participant_identity: str, key_index: int) -> bytes: req.e2ee.ratchet_key.participant_identity = participant_identity req.e2ee.ratchet_key.key_index = key_index - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) new_key = resp.e2ee.ratchet_key.new_key return new_key @@ -129,7 +129,7 @@ def set_enabled(self, enabled: bool) -> None: req.e2ee.room_handle = self._room_handle req.e2ee.cryptor_set_enabled.participant_identity = self._participant_identity req.e2ee.cryptor_set_enabled.enabled = enabled - ffi_client.request(req) + FfiClient.instance.request(req) def set_key_index(self, key_index: int) -> None: self._key_index = key_index @@ -137,7 +137,7 @@ def set_key_index(self, key_index: int) -> None: req.e2ee.room_handle = self._room_handle req.e2ee.cryptor_set_key_index.participant_identity = self._participant_identity req.e2ee.cryptor_set_key_index.key_index = key_index - ffi_client.request(req) + FfiClient.instance.request(req) class E2EEManager: @@ -164,13 +164,13 @@ def set_enabled(self, enabled: bool) -> None: req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.manager_set_enabled.enabled = enabled - ffi_client.request(req) + FfiClient.instance.request(req) def frame_cryptors(self) -> List[FrameCryptor]: req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) frame_cryptors = [] for frame_cryptor in resp.e2ee.manager_get_frame_cryptors.frame_cryptors: frame_cryptors.append( diff --git a/livekit-rtc/livekit/rtc/participant.py b/livekit-rtc/livekit/rtc/participant.py index be0f393a..9ea65200 100644 --- a/livekit-rtc/livekit/rtc/participant.py +++ b/livekit-rtc/livekit/rtc/participant.py @@ -15,7 +15,7 @@ import ctypes from typing import List, Union -from ._ffi_client import FfiHandle, ffi_client +from ._ffi_client import FfiHandle, FfiClient from ._proto import ffi_pb2 as proto_ffi from ._proto import participant_pb2 as proto_participant from ._proto.room_pb2 import DataPacketKind, TrackPublishOptions @@ -105,14 +105,14 @@ async def publish_data( req.publish_data.destination_sids.extend(sids) - queue = ffi_client.queue.subscribe() + queue = FfiClient.instance.queue.subscribe() try: - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) cb = await queue.wait_for( lambda e: e.publish_data.async_id == resp.publish_data.async_id ) finally: - ffi_client.queue.unsubscribe(queue) + FfiClient.instance.queue.unsubscribe(queue) if cb.publish_data.error: raise PublishDataError(cb.publish_data.error) @@ -122,30 +122,30 @@ async def update_metadata(self, metadata: str) -> None: req.update_local_metadata.local_participant_handle = self._ffi_handle.handle req.update_local_metadata.metadata = metadata - queue = ffi_client.queue.subscribe() + queue = FfiClient.instance.queue.subscribe() try: - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) await queue.wait_for( lambda e: e.update_local_metadata.async_id == resp.update_local_metadata.async_id ) finally: - ffi_client.queue.unsubscribe(queue) + FfiClient.instance.queue.unsubscribe(queue) async def update_name(self, name: str) -> None: req = proto_ffi.FfiRequest() req.update_local_name.local_participant_handle = self._ffi_handle.handle req.update_local_name.name = name - queue = ffi_client.queue.subscribe() + queue = FfiClient.instance.queue.subscribe() try: - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) await queue.wait_for( lambda e: e.update_local_name.async_id == resp.update_local_name.async_id ) finally: - ffi_client.queue.unsubscribe(queue) + FfiClient.instance.queue.unsubscribe(queue) async def publish_track( self, track: LocalTrack, options: TrackPublishOptions @@ -157,7 +157,7 @@ async def publish_track( queue = self._room_queue.subscribe() try: - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) cb = await queue.wait_for( lambda e: e.publish_track.async_id == resp.publish_track.async_id ) @@ -181,7 +181,7 @@ async def unpublish_track(self, track_sid: str) -> None: queue = self._room_queue.subscribe() try: - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) cb = await queue.wait_for( lambda e: e.unpublish_track.async_id == resp.unpublish_track.async_id ) diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index 52ea5012..aa5ad879 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -17,7 +17,7 @@ import logging from dataclasses import dataclass, field from typing import Dict, Optional, Literal -from ._ffi_client import FfiHandle, ffi_client +from ._ffi_client import FfiHandle, FfiClient from ._proto import ffi_pb2 as proto_ffi from ._proto import participant_pb2 as proto_participant from ._proto import room_pb2 as proto_room @@ -95,7 +95,7 @@ def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> None: def __del__(self) -> None: if self._ffi_handle is not None: - ffi_client.queue.unsubscribe(self._ffi_queue) + FfiClient.instance.queue.unsubscribe(self._ffi_queue) @property def sid(self) -> str: @@ -157,19 +157,19 @@ async def connect( ) # subscribe before connecting so we don't miss any events - self._ffi_queue = ffi_client.queue.subscribe(self._loop) + self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop) - queue = ffi_client.queue.subscribe() + queue = FfiClient.instance.queue.subscribe() try: - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) cb = await queue.wait_for( lambda e: e.connect.async_id == resp.connect.async_id ) finally: - ffi_client.queue.unsubscribe(queue) + FfiClient.instance.queue.unsubscribe(queue) if cb.connect.error: - ffi_client.queue.unsubscribe(self._ffi_queue) + FfiClient.instance.queue.unsubscribe(self._ffi_queue) raise ConnectError(cb.connect.error) self._ffi_handle = FfiHandle(cb.connect.room.handle.id) @@ -201,17 +201,17 @@ async def disconnect(self) -> None: req = proto_ffi.FfiRequest() req.disconnect.room_handle = self._ffi_handle.handle # type: ignore - queue = ffi_client.queue.subscribe() + queue = FfiClient.instance.queue.subscribe() try: - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) await queue.wait_for( lambda e: e.disconnect.async_id == resp.disconnect.async_id ) finally: - ffi_client.queue.unsubscribe(queue) + FfiClient.instance.queue.unsubscribe(queue) await self._task - ffi_client.queue.unsubscribe(self._ffi_queue) + FfiClient.instance.queue.unsubscribe(self._ffi_queue) async def _listen_task(self) -> None: # listen to incoming room events diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index 31b40bc1..abfe1adc 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -13,7 +13,7 @@ # limitations under the License. from typing import TYPE_CHECKING, List, Union -from ._ffi_client import FfiHandle, ffi_client +from ._ffi_client import FfiHandle, FfiClient from ._proto import ffi_pb2 as proto_ffi from ._proto import track_pb2 as proto_track from ._proto import stats_pb2 as proto_stats @@ -52,14 +52,14 @@ async def get_stats(self) -> List[proto_stats.RtcStats]: req = proto_ffi.FfiRequest() req.get_stats.track_handle = self._ffi_handle.handle - queue = ffi_client.queue.subscribe() + queue = FfiClient.instance.queue.subscribe() try: - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) cb = await queue.wait_for( lambda e: e.get_stats.async_id == resp.get_stats.async_id ) finally: - ffi_client.queue.unsubscribe(queue) + FfiClient.instance.queue.unsubscribe(queue) if cb.get_stats.error: raise Exception(cb.get_stats.error) @@ -77,7 +77,7 @@ def create_audio_track(name: str, source: "AudioSource") -> "LocalAudioTrack": req.create_audio_track.name = name req.create_audio_track.source_handle = source._ffi_handle.handle - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) return LocalAudioTrack(resp.create_audio_track.track) @@ -91,7 +91,7 @@ def create_video_track(name: str, source: "VideoSource") -> "LocalVideoTrack": req.create_video_track.name = name req.create_video_track.source_handle = source._ffi_handle.handle - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) return LocalVideoTrack(resp.create_video_track.track) diff --git a/livekit-rtc/livekit/rtc/track_publication.py b/livekit-rtc/livekit/rtc/track_publication.py index ddf67a00..104acda8 100644 --- a/livekit-rtc/livekit/rtc/track_publication.py +++ b/livekit-rtc/livekit/rtc/track_publication.py @@ -14,7 +14,7 @@ from typing import Optional -from ._ffi_client import FfiHandle, ffi_client +from ._ffi_client import FfiHandle, FfiClient from ._proto import e2ee_pb2 as proto_e2ee from ._proto import ffi_pb2 as proto_ffi from ._proto import track_pb2 as proto_track @@ -82,4 +82,4 @@ def set_subscribed(self, subscribed: bool): req = proto_ffi.FfiRequest() req.set_subscribed.subscribe = subscribed req.set_subscribed.publication_handle = self._ffi_handle.handle - ffi_client.request(req) + FfiClient.instance.request(req) diff --git a/livekit-rtc/livekit/rtc/video_frame.py b/livekit-rtc/livekit/rtc/video_frame.py index 159ffe7b..0ad82a94 100644 --- a/livekit-rtc/livekit/rtc/video_frame.py +++ b/livekit-rtc/livekit/rtc/video_frame.py @@ -15,7 +15,7 @@ import ctypes from typing import Union -from ._ffi_client import FfiHandle, ffi_client +from ._ffi_client import FfiHandle, FfiClient from ._proto import ffi_pb2 as proto_ffi from ._utils import get_address from ._proto import video_frame_pb2 as proto_video_frame @@ -75,7 +75,7 @@ def _proto_info(self) -> proto_video_frame.VideoFrameBufferInfo: def to_i420(self) -> "I420Buffer": req = proto_ffi.FfiRequest() req.to_i420.buffer.CopyFrom(self._proto_info()) - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) return I420Buffer._from_owned_info(resp.to_i420.buffer) def to_argb(self, dst: "ArgbFrame") -> None: @@ -86,7 +86,7 @@ def to_argb(self, dst: "ArgbFrame") -> None: req.to_argb.dst_stride = dst.stride req.to_argb.dst_width = dst.width req.to_argb.dst_height = dst.height - ffi_client.request(req) + FfiClient.instance.request(req) @staticmethod def _from_owned_info( @@ -139,7 +139,7 @@ def _from_owned_info( def to_i420(self) -> "I420Buffer": req = proto_ffi.FfiRequest() req.to_i420.handle = self._ffi_handle.handle - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) return I420Buffer._from_owned_info(resp.to_i420.buffer) def to_argb(self, dst: "ArgbFrame") -> None: @@ -768,7 +768,7 @@ def to_i420(self) -> I420Buffer: req.to_i420.argb.height = self.height req.to_i420.argb.stride = self.stride req.to_i420.argb.ptr = get_address(memoryview(self._data)) - res = ffi_client.request(req) + res = FfiClient.instance.request(req) return I420Buffer._from_owned_info(res.to_i420.buffer) @property diff --git a/livekit-rtc/livekit/rtc/video_source.py b/livekit-rtc/livekit/rtc/video_source.py index 4ccb4b41..8029e831 100644 --- a/livekit-rtc/livekit/rtc/video_source.py +++ b/livekit-rtc/livekit/rtc/video_source.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._ffi_client import FfiHandle, ffi_client +from ._ffi_client import FfiHandle, FfiClient from ._proto import ffi_pb2 as proto_ffi from ._proto import video_frame_pb2 as proto_video_frame from .video_frame import VideoFrame @@ -27,7 +27,7 @@ def __init__(self, width: int, height: int) -> None: req.new_video_source.resolution.width = width req.new_video_source.resolution.height = height - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) self._info = resp.new_video_source.source self._ffi_handle = FfiHandle(self._info.handle.id) @@ -37,4 +37,4 @@ def capture_frame(self, frame: VideoFrame) -> None: req.capture_video_frame.info.CopyFrom(frame.buffer._proto_info()) req.capture_video_frame.frame.rotation = frame.rotation req.capture_video_frame.frame.timestamp_us = frame.timestamp_us - ffi_client.request(req) + FfiClient.instance.request(req) diff --git a/livekit-rtc/livekit/rtc/video_stream.py b/livekit-rtc/livekit/rtc/video_stream.py index 8a9bf01b..76346693 100644 --- a/livekit-rtc/livekit/rtc/video_stream.py +++ b/livekit-rtc/livekit/rtc/video_stream.py @@ -15,7 +15,7 @@ import asyncio from typing import Optional -from ._ffi_client import FfiHandle, ffi_client +from ._ffi_client import FfiHandle, FfiClient from ._proto import ffi_pb2 as proto_ffi from ._proto import video_frame_pb2 as proto_video_frame from ._utils import RingQueue, task_done_logger @@ -32,14 +32,14 @@ def __init__( ) -> None: self._track = track self._loop = loop or asyncio.get_event_loop() - self._ffi_queue = ffi_client.queue.subscribe(self._loop) + self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop) self._queue: RingQueue[VideoFrame] = RingQueue(capacity) req = proto_ffi.FfiRequest() new_video_stream = req.new_video_stream new_video_stream.track_handle = track._ffi_handle.handle new_video_stream.type = proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE - resp = ffi_client.request(req) + resp = FfiClient.instance.request(req) stream_info = resp.new_video_stream.stream self._ffi_handle = FfiHandle(stream_info.handle.id) @@ -48,7 +48,7 @@ def __init__( self._task.add_done_callback(task_done_logger) def __del__(self) -> None: - ffi_client.queue.unsubscribe(self._ffi_queue) + FfiClient.instance.queue.unsubscribe(self._ffi_queue) async def _run(self): while True: @@ -68,7 +68,7 @@ async def _run(self): elif video_event.HasField("eos"): break - ffi_client.queue.unsubscribe(self._ffi_queue) + FfiClient.instance.queue.unsubscribe(self._ffi_queue) async def aclose(self): self._ffi_handle.dispose()