Skip to content

Commit

Permalink
Instantiate FfiClient Lazily (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
keepingitneil authored Dec 15, 2023
1 parent 5d2bff6 commit 585a6c1
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 73 deletions.
15 changes: 10 additions & 5 deletions livekit-rtc/livekit/rtc/_ffi_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import pkg_resources

from ._proto import ffi_pb2 as proto_ffi
from ._utils import Queue
from ._utils import Queue, classproperty

logger = logging.getLogger("livekit")

Expand Down Expand Up @@ -142,7 +142,7 @@ def ffi_event_callback(

return # no need to queue the logs

ffi_client.queue.put(event)
FfiClient.instance.queue.put(event)


def to_python_level(level: proto_ffi.LogLevel.ValueType) -> int:
Expand All @@ -161,6 +161,14 @@ def to_python_level(level: proto_ffi.LogLevel.ValueType) -> int:


class FfiClient:
_instance: Optional["FfiClient"] = None

@classproperty
def instance(self):
if self._instance is None:
self._instance = FfiClient()
return self._instance

def __init__(self) -> None:
self._lock = threading.RLock()
self._queue = FfiQueue[proto_ffi.FfiEvent]()
Expand Down Expand Up @@ -188,6 +196,3 @@ def request(self, req: proto_ffi.FfiRequest) -> proto_ffi.FfiResponse:

FfiHandle(handle)
return resp


ffi_client = FfiClient()
8 changes: 8 additions & 0 deletions livekit-rtc/livekit/rtc/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@
logger = logging.getLogger("livekit")


class classproperty(object):
def __init__(self, f):
self.f = classmethod(f)

def __get__(self, *a):
return self.f.__get__(*a)()


def task_done_logger(task: asyncio.Task) -> None:
if task.cancelled():
logger.info("task cancelled: %s", task)
Expand Down
6 changes: 3 additions & 3 deletions livekit-rtc/livekit/rtc/audio_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import ctypes
from ._ffi_client import FfiHandle, ffi_client
from ._ffi_client import FfiHandle, FfiClient
from ._proto import audio_frame_pb2 as proto_audio
from ._proto import ffi_pb2 as proto_ffi
from ._utils import get_address
Expand Down Expand Up @@ -65,7 +65,7 @@ def remix_and_resample(self, sample_rate: int, num_channels: int) -> "AudioFrame
req = proto_ffi.FfiRequest()
req.new_audio_resampler.CopyFrom(proto_audio.NewAudioResamplerRequest())

resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
resampler_handle = FfiHandle(resp.new_audio_resampler.resampler.handle.id)

resample_req = proto_ffi.FfiRequest()
Expand All @@ -74,7 +74,7 @@ def remix_and_resample(self, sample_rate: int, num_channels: int) -> "AudioFrame
resample_req.remix_and_resample.sample_rate = sample_rate
resample_req.remix_and_resample.num_channels = num_channels

resp = ffi_client.request(resample_req)
resp = FfiClient.instance.request(resample_req)
return AudioFrame._from_owned_info(resp.remix_and_resample.buffer)

def _proto_info(self) -> proto_audio.AudioFrameBufferInfo:
Expand Down
10 changes: 5 additions & 5 deletions livekit-rtc/livekit/rtc/audio_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ._ffi_client import FfiHandle, ffi_client
from ._ffi_client import FfiHandle, FfiClient
from ._proto import audio_frame_pb2 as proto_audio_frame
from ._proto import ffi_pb2 as proto_ffi
from .audio_frame import AudioFrame
Expand All @@ -27,7 +27,7 @@ def __init__(self, sample_rate: int, num_channels: int) -> None:
req.new_audio_source.sample_rate = sample_rate
req.new_audio_source.num_channels = num_channels

resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
self._info = resp.new_audio_source.source
self._ffi_handle = FfiHandle(self._info.handle.id)

Expand All @@ -37,15 +37,15 @@ async def capture_frame(self, frame: AudioFrame) -> None:
req.capture_audio_frame.source_handle = self._ffi_handle.handle
req.capture_audio_frame.buffer.CopyFrom(frame._proto_info())

queue = ffi_client.queue.subscribe()
queue = FfiClient.instance.queue.subscribe()
try:
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
cb = await queue.wait_for(
lambda e: e.capture_audio_frame.async_id
== resp.capture_audio_frame.async_id
)
finally:
ffi_client.queue.unsubscribe(queue)
FfiClient.instance.queue.unsubscribe(queue)

if cb.capture_audio_frame.error:
raise Exception(cb.capture_audio_frame.error)
10 changes: 5 additions & 5 deletions livekit-rtc/livekit/rtc/audio_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import asyncio
from typing import Optional

from ._ffi_client import FfiHandle, ffi_client
from ._ffi_client import FfiHandle, FfiClient
from ._proto import audio_frame_pb2 as proto_audio_frame
from ._proto import ffi_pb2 as proto_ffi
from ._utils import RingQueue, task_done_logger
Expand All @@ -32,14 +32,14 @@ def __init__(
) -> None:
self._track = track
self._loop = loop or asyncio.get_event_loop()
self._ffi_queue = ffi_client.queue.subscribe(self._loop)
self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
self._queue: RingQueue[AudioFrame] = RingQueue(capacity)

req = proto_ffi.FfiRequest()
new_audio_stream = req.new_audio_stream
new_audio_stream.track_handle = track._ffi_handle.handle
new_audio_stream.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)

stream_info = resp.new_audio_stream.stream
self._ffi_handle = FfiHandle(stream_info.handle.id)
Expand All @@ -49,7 +49,7 @@ def __init__(
self._task.add_done_callback(task_done_logger)

def __del__(self) -> None:
ffi_client.queue.unsubscribe(self._ffi_queue)
FfiClient.instance.queue.unsubscribe(self._ffi_queue)

async def _run(self):
while True:
Expand All @@ -63,7 +63,7 @@ async def _run(self):
elif audio_event.HasField("eos"):
break

ffi_client.queue.unsubscribe(self._ffi_queue)
FfiClient.instance.queue.unsubscribe(self._ffi_queue)

async def aclose(self):
self._ffi_handle.dispose()
Expand Down
22 changes: 11 additions & 11 deletions livekit-rtc/livekit/rtc/e2ee.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from dataclasses import dataclass, field
from typing import List, Optional

from ._ffi_client import ffi_client
from ._ffi_client import FfiClient
from ._proto import e2ee_pb2 as proto_e2ee
from ._proto import ffi_pb2 as proto_ffi

Expand Down Expand Up @@ -52,13 +52,13 @@ def set_shared_key(self, key: bytes, key_index: int) -> None:
req.e2ee.room_handle = self._room_handle
req.e2ee.set_shared_key.key_index = key_index
req.e2ee.set_shared_key.shared_key = key
ffi_client.request(req)
FfiClient.instance.request(req)

def export_shared_key(self, key_index: int) -> bytes:
req = proto_ffi.FfiRequest()
req.e2ee.room_handle = self._room_handle
req.e2ee.get_shared_key.key_index = key_index
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
key = resp.e2ee.get_shared_key.key
return key

Expand All @@ -67,7 +67,7 @@ def ratchet_shared_key(self, key_index: int) -> bytes:
req.e2ee.room_handle = self._room_handle
req.e2ee.ratchet_shared_key.key_index = key_index

resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)

new_key = resp.e2ee.ratchet_shared_key.new_key
return new_key
Expand All @@ -80,14 +80,14 @@ def set_key(self, participant_identity: str, key: bytes, key_index: int) -> None
req.e2ee.set_key.key = key

self.key_index = key_index
ffi_client.request(req)
FfiClient.instance.request(req)

def export_key(self, participant_identity: str, key_index: int) -> bytes:
req = proto_ffi.FfiRequest()
req.e2ee.room_handle = self._room_handle
req.e2ee.get_key.participant_identity = participant_identity
req.e2ee.get_key.key_index = key_index
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
key = resp.e2ee.get_key.key
return key

Expand All @@ -97,7 +97,7 @@ def ratchet_key(self, participant_identity: str, key_index: int) -> bytes:
req.e2ee.ratchet_key.participant_identity = participant_identity
req.e2ee.ratchet_key.key_index = key_index

resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
new_key = resp.e2ee.ratchet_key.new_key
return new_key

Expand Down Expand Up @@ -129,15 +129,15 @@ def set_enabled(self, enabled: bool) -> None:
req.e2ee.room_handle = self._room_handle
req.e2ee.cryptor_set_enabled.participant_identity = self._participant_identity
req.e2ee.cryptor_set_enabled.enabled = enabled
ffi_client.request(req)
FfiClient.instance.request(req)

def set_key_index(self, key_index: int) -> None:
self._key_index = key_index
req = proto_ffi.FfiRequest()
req.e2ee.room_handle = self._room_handle
req.e2ee.cryptor_set_key_index.participant_identity = self._participant_identity
req.e2ee.cryptor_set_key_index.key_index = key_index
ffi_client.request(req)
FfiClient.instance.request(req)


class E2EEManager:
Expand All @@ -164,13 +164,13 @@ def set_enabled(self, enabled: bool) -> None:
req = proto_ffi.FfiRequest()
req.e2ee.room_handle = self._room_handle
req.e2ee.manager_set_enabled.enabled = enabled
ffi_client.request(req)
FfiClient.instance.request(req)

def frame_cryptors(self) -> List[FrameCryptor]:
req = proto_ffi.FfiRequest()
req.e2ee.room_handle = self._room_handle

resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
frame_cryptors = []
for frame_cryptor in resp.e2ee.manager_get_frame_cryptors.frame_cryptors:
frame_cryptors.append(
Expand Down
24 changes: 12 additions & 12 deletions livekit-rtc/livekit/rtc/participant.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import ctypes
from typing import List, Union

from ._ffi_client import FfiHandle, ffi_client
from ._ffi_client import FfiHandle, FfiClient
from ._proto import ffi_pb2 as proto_ffi
from ._proto import participant_pb2 as proto_participant
from ._proto.room_pb2 import DataPacketKind, TrackPublishOptions
Expand Down Expand Up @@ -105,14 +105,14 @@ async def publish_data(

req.publish_data.destination_sids.extend(sids)

queue = ffi_client.queue.subscribe()
queue = FfiClient.instance.queue.subscribe()
try:
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
cb = await queue.wait_for(
lambda e: e.publish_data.async_id == resp.publish_data.async_id
)
finally:
ffi_client.queue.unsubscribe(queue)
FfiClient.instance.queue.unsubscribe(queue)

if cb.publish_data.error:
raise PublishDataError(cb.publish_data.error)
Expand All @@ -122,30 +122,30 @@ async def update_metadata(self, metadata: str) -> None:
req.update_local_metadata.local_participant_handle = self._ffi_handle.handle
req.update_local_metadata.metadata = metadata

queue = ffi_client.queue.subscribe()
queue = FfiClient.instance.queue.subscribe()
try:
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
await queue.wait_for(
lambda e: e.update_local_metadata.async_id
== resp.update_local_metadata.async_id
)
finally:
ffi_client.queue.unsubscribe(queue)
FfiClient.instance.queue.unsubscribe(queue)

async def update_name(self, name: str) -> None:
req = proto_ffi.FfiRequest()
req.update_local_name.local_participant_handle = self._ffi_handle.handle
req.update_local_name.name = name

queue = ffi_client.queue.subscribe()
queue = FfiClient.instance.queue.subscribe()
try:
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
await queue.wait_for(
lambda e: e.update_local_name.async_id
== resp.update_local_name.async_id
)
finally:
ffi_client.queue.unsubscribe(queue)
FfiClient.instance.queue.unsubscribe(queue)

async def publish_track(
self, track: LocalTrack, options: TrackPublishOptions
Expand All @@ -157,7 +157,7 @@ async def publish_track(

queue = self._room_queue.subscribe()
try:
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
cb = await queue.wait_for(
lambda e: e.publish_track.async_id == resp.publish_track.async_id
)
Expand All @@ -181,7 +181,7 @@ async def unpublish_track(self, track_sid: str) -> None:

queue = self._room_queue.subscribe()
try:
resp = ffi_client.request(req)
resp = FfiClient.instance.request(req)
cb = await queue.wait_for(
lambda e: e.unpublish_track.async_id == resp.unpublish_track.async_id
)
Expand Down
Loading

0 comments on commit 585a6c1

Please sign in to comment.