Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instantiate FfiClient Lazily #125

Merged
merged 3 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions livekit-rtc/livekit/rtc/_ffi_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import pkg_resources

from ._proto import ffi_pb2 as proto_ffi
from ._utils import Queue
from ._utils import Queue, classproperty

logger = logging.getLogger("livekit")

Expand Down Expand Up @@ -142,7 +142,7 @@ def ffi_event_callback(

return # no need to queue the logs

ffi_client.queue.put(event)
FfiClient.instance.queue.put(event)


def to_python_level(level: proto_ffi.LogLevel.ValueType) -> int:
Expand All @@ -160,7 +160,17 @@ def to_python_level(level: proto_ffi.LogLevel.ValueType) -> int:
raise Exception("unreachable")


_ffi_instance: Optional["FfiClient"] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can move that inside the class



class FfiClient:
@classproperty
def instance(self):
global _ffi_instance
if _ffi_instance is None:
_ffi_instance = FfiClient()
return _ffi_instance

def __init__(self) -> None:
self._lock = threading.RLock()
self._queue = FfiQueue[proto_ffi.FfiEvent]()
Expand Down Expand Up @@ -188,6 +198,3 @@ def request(self, req: proto_ffi.FfiRequest) -> proto_ffi.FfiResponse:

FfiHandle(handle)
return resp


ffi_client = FfiClient()
8 changes: 8 additions & 0 deletions livekit-rtc/livekit/rtc/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@
logger = logging.getLogger("livekit")


class classproperty(object):
def __init__(self, f):
self.f = classmethod(f)

def __get__(self, *a):
return self.f.__get__(*a)()


def task_done_logger(task: asyncio.Task) -> None:
if task.cancelled():
logger.info("task cancelled: %s", task)
Expand Down
6 changes: 3 additions & 3 deletions livekit-rtc/livekit/rtc/audio_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import ctypes
from ._ffi_client import FfiHandle, ffi_client
from ._ffi_client import FfiHandle, FfiClient
from ._proto import audio_frame_pb2 as proto_audio
from ._proto import ffi_pb2 as proto_ffi
from ._utils import get_address
Expand Down Expand Up @@ -65,7 +65,7 @@ def remix_and_resample(self, sample_rate: int, num_channels: int) -> "AudioFrame
req = proto_ffi.FfiRequest()
req.new_audio_resampler.CopyFrom(proto_audio.NewAudioResamplerRequest())

resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
resampler_handle = FfiHandle(resp.new_audio_resampler.resampler.handle.id)

resample_req = proto_ffi.FfiRequest()
Expand All @@ -74,7 +74,7 @@ def remix_and_resample(self, sample_rate: int, num_channels: int) -> "AudioFrame
resample_req.remix_and_resample.sample_rate = sample_rate
resample_req.remix_and_resample.num_channels = num_channels

resp = ffi_client.request(resample_req)
resp = FfiClient.instance.request(resample_req)
return AudioFrame._from_owned_info(resp.remix_and_resample.buffer)

def _proto_info(self) -> proto_audio.AudioFrameBufferInfo:
Expand Down
10 changes: 5 additions & 5 deletions livekit-rtc/livekit/rtc/audio_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ._ffi_client import FfiHandle, ffi_client
from ._ffi_client import FfiHandle, FfiClient
from ._proto import audio_frame_pb2 as proto_audio_frame
from ._proto import ffi_pb2 as proto_ffi
from .audio_frame import AudioFrame
Expand All @@ -27,7 +27,7 @@ def __init__(self, sample_rate: int, num_channels: int) -> None:
req.new_audio_source.sample_rate = sample_rate
req.new_audio_source.num_channels = num_channels

resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
self._info = resp.new_audio_source.source
self._ffi_handle = FfiHandle(self._info.handle.id)

Expand All @@ -37,15 +37,15 @@ async def capture_frame(self, frame: AudioFrame) -> None:
req.capture_audio_frame.source_handle = self._ffi_handle.handle
req.capture_audio_frame.buffer.CopyFrom(frame._proto_info())

queue = ffi_client.queue.subscribe()
queue = FfiClient.instance.queue.subscribe()
try:
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
cb = await queue.wait_for(
lambda e: e.capture_audio_frame.async_id
== resp.capture_audio_frame.async_id
)
finally:
ffi_client.queue.unsubscribe(queue)
FfiClient.instance.queue.unsubscribe(queue)

if cb.capture_audio_frame.error:
raise Exception(cb.capture_audio_frame.error)
10 changes: 5 additions & 5 deletions livekit-rtc/livekit/rtc/audio_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import asyncio
from typing import Optional

from ._ffi_client import FfiHandle, ffi_client
from ._ffi_client import FfiHandle, FfiClient
from ._proto import audio_frame_pb2 as proto_audio_frame
from ._proto import ffi_pb2 as proto_ffi
from ._utils import RingQueue, task_done_logger
Expand All @@ -32,14 +32,14 @@ def __init__(
) -> None:
self._track = track
self._loop = loop or asyncio.get_event_loop()
self._ffi_queue = ffi_client.queue.subscribe(self._loop)
self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
self._queue: RingQueue[AudioFrame] = RingQueue(capacity)

req = proto_ffi.FfiRequest()
new_audio_stream = req.new_audio_stream
new_audio_stream.track_handle = track._ffi_handle.handle
new_audio_stream.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)

stream_info = resp.new_audio_stream.stream
self._ffi_handle = FfiHandle(stream_info.handle.id)
Expand All @@ -49,7 +49,7 @@ def __init__(
self._task.add_done_callback(task_done_logger)

def __del__(self) -> None:
ffi_client.queue.unsubscribe(self._ffi_queue)
FfiClient.instance.queue.unsubscribe(self._ffi_queue)

async def _run(self):
while True:
Expand All @@ -63,7 +63,7 @@ async def _run(self):
elif audio_event.HasField("eos"):
break

ffi_client.queue.unsubscribe(self._ffi_queue)
FfiClient.instance.queue.unsubscribe(self._ffi_queue)

async def aclose(self):
self._ffi_handle.dispose()
Expand Down
22 changes: 11 additions & 11 deletions livekit-rtc/livekit/rtc/e2ee.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from dataclasses import dataclass, field
from typing import List, Optional

from ._ffi_client import ffi_client
from ._ffi_client import FfiClient
from ._proto import e2ee_pb2 as proto_e2ee
from ._proto import ffi_pb2 as proto_ffi

Expand Down Expand Up @@ -52,13 +52,13 @@ def set_shared_key(self, key: bytes, key_index: int) -> None:
req.e2ee.room_handle = self._room_handle
req.e2ee.set_shared_key.key_index = key_index
req.e2ee.set_shared_key.shared_key = key
ffi_client.request(req)
FfiClient.instance.request(req)

def export_shared_key(self, key_index: int) -> bytes:
req = proto_ffi.FfiRequest()
req.e2ee.room_handle = self._room_handle
req.e2ee.get_shared_key.key_index = key_index
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
key = resp.e2ee.get_shared_key.key
return key

Expand All @@ -67,7 +67,7 @@ def ratchet_shared_key(self, key_index: int) -> bytes:
req.e2ee.room_handle = self._room_handle
req.e2ee.ratchet_shared_key.key_index = key_index

resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)

new_key = resp.e2ee.ratchet_shared_key.new_key
return new_key
Expand All @@ -80,14 +80,14 @@ def set_key(self, participant_identity: str, key: bytes, key_index: int) -> None
req.e2ee.set_key.key = key

self.key_index = key_index
ffi_client.request(req)
FfiClient.instance.request(req)

def export_key(self, participant_identity: str, key_index: int) -> bytes:
req = proto_ffi.FfiRequest()
req.e2ee.room_handle = self._room_handle
req.e2ee.get_key.participant_identity = participant_identity
req.e2ee.get_key.key_index = key_index
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
key = resp.e2ee.get_key.key
return key

Expand All @@ -97,7 +97,7 @@ def ratchet_key(self, participant_identity: str, key_index: int) -> bytes:
req.e2ee.ratchet_key.participant_identity = participant_identity
req.e2ee.ratchet_key.key_index = key_index

resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
new_key = resp.e2ee.ratchet_key.new_key
return new_key

Expand Down Expand Up @@ -129,15 +129,15 @@ def set_enabled(self, enabled: bool) -> None:
req.e2ee.room_handle = self._room_handle
req.e2ee.cryptor_set_enabled.participant_identity = self._participant_identity
req.e2ee.cryptor_set_enabled.enabled = enabled
ffi_client.request(req)
FfiClient.instance.request(req)

def set_key_index(self, key_index: int) -> None:
self._key_index = key_index
req = proto_ffi.FfiRequest()
req.e2ee.room_handle = self._room_handle
req.e2ee.cryptor_set_key_index.participant_identity = self._participant_identity
req.e2ee.cryptor_set_key_index.key_index = key_index
ffi_client.request(req)
FfiClient.instance.request(req)


class E2EEManager:
Expand All @@ -164,13 +164,13 @@ def set_enabled(self, enabled: bool) -> None:
req = proto_ffi.FfiRequest()
req.e2ee.room_handle = self._room_handle
req.e2ee.manager_set_enabled.enabled = enabled
ffi_client.request(req)
FfiClient.instance.request(req)

def frame_cryptors(self) -> List[FrameCryptor]:
req = proto_ffi.FfiRequest()
req.e2ee.room_handle = self._room_handle

resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
frame_cryptors = []
for frame_cryptor in resp.e2ee.manager_get_frame_cryptors.frame_cryptors:
frame_cryptors.append(
Expand Down
24 changes: 12 additions & 12 deletions livekit-rtc/livekit/rtc/participant.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import ctypes
from typing import List, Union

from ._ffi_client import FfiHandle, ffi_client
from ._ffi_client import FfiHandle, FfiClient
from ._proto import ffi_pb2 as proto_ffi
from ._proto import participant_pb2 as proto_participant
from ._proto.room_pb2 import DataPacketKind, TrackPublishOptions
Expand Down Expand Up @@ -105,14 +105,14 @@ async def publish_data(

req.publish_data.destination_sids.extend(sids)

queue = ffi_client.queue.subscribe()
queue = FfiClient.instance.queue.subscribe()
try:
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
cb = await queue.wait_for(
lambda e: e.publish_data.async_id == resp.publish_data.async_id
)
finally:
ffi_client.queue.unsubscribe(queue)
FfiClient.instance.queue.unsubscribe(queue)

if cb.publish_data.error:
raise PublishDataError(cb.publish_data.error)
Expand All @@ -122,30 +122,30 @@ async def update_metadata(self, metadata: str) -> None:
req.update_local_metadata.local_participant_handle = self._ffi_handle.handle
req.update_local_metadata.metadata = metadata

queue = ffi_client.queue.subscribe()
queue = FfiClient.instance.queue.subscribe()
try:
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
await queue.wait_for(
lambda e: e.update_local_metadata.async_id
== resp.update_local_metadata.async_id
)
finally:
ffi_client.queue.unsubscribe(queue)
FfiClient.instance.queue.unsubscribe(queue)

async def update_name(self, name: str) -> None:
req = proto_ffi.FfiRequest()
req.update_local_name.local_participant_handle = self._ffi_handle.handle
req.update_local_name.name = name

queue = ffi_client.queue.subscribe()
queue = FfiClient.instance.queue.subscribe()
try:
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
await queue.wait_for(
lambda e: e.update_local_name.async_id
== resp.update_local_name.async_id
)
finally:
ffi_client.queue.unsubscribe(queue)
FfiClient.instance.queue.unsubscribe(queue)

async def publish_track(
self, track: LocalTrack, options: TrackPublishOptions
Expand All @@ -157,7 +157,7 @@ async def publish_track(

queue = self._room_queue.subscribe()
try:
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
cb = await queue.wait_for(
lambda e: e.publish_track.async_id == resp.publish_track.async_id
)
Expand All @@ -181,7 +181,7 @@ async def unpublish_track(self, track_sid: str) -> None:

queue = self._room_queue.subscribe()
try:
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
cb = await queue.wait_for(
lambda e: e.unpublish_track.async_id == resp.unpublish_track.async_id
)
Expand Down
Loading
Loading