Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more docstrings & add rtc.combine_audio_frames #268

Merged
merged 2 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions livekit-rtc/livekit/rtc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from .video_source import VideoSource
from .video_stream import VideoFrameEvent, VideoStream
from .audio_resampler import AudioResampler, AudioResamplerQuality
from .utils import combine_audio_frames

__all__ = [
"ConnectionQuality",
Expand Down Expand Up @@ -130,5 +131,6 @@
"ChatMessage",
"AudioResampler",
"AudioResamplerQuality",
"combine_audio_frames",
"__version__",
]
53 changes: 44 additions & 9 deletions livekit-rtc/livekit/rtc/audio_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@


class AudioSource:
"""
Represents a real-time audio source with an internal audio queue.

The `AudioSource` class allows you to push audio frames into a real-time audio
source, managing an internal queue of audio data up to a maximum duration defined
by `queue_size_ms`. It supports asynchronous operations to capture audio frames
and to wait for the playback of all queued audio data.
"""

def __init__(
self,
sample_rate: int,
Expand All @@ -35,11 +44,12 @@ def __init__(
Initializes a new instance of the audio source.

Args:
sample_rate (int): The sample rate of the audio source in Hz
num_channels (int): The number of audio channels
sample_rate (int): The sample rate of the audio source in Hz.
num_channels (int): The number of audio channels.
queue_size_ms (int, optional): The buffer size of the audio queue in milliseconds.
Defaults to 1000 ms.
loop (asyncio.AbstractEventLoop, optional): The event loop to use. Defaults to asyncio.get_event_loop().
loop (asyncio.AbstractEventLoop, optional): The event loop to use. Defaults to
`asyncio.get_event_loop()`.
"""
self._sample_rate = sample_rate
self._num_channels = num_channels
Expand All @@ -63,29 +73,48 @@ def __init__(

@property
def sample_rate(self) -> int:
"""The sample rate of the audio source in Hz."""
return self._sample_rate

@property
def num_channels(self) -> int:
"""The number of audio channels."""
return self._num_channels

@property
def queued_duration(self) -> float:
"""The current duration (in seconds) of audio data queued for playback."""
return max(self._q_size - time.monotonic() + self._last_capture, 0.0)

def clear_queue(self) -> None:
"""Clears the audio queue, discarding all buffered audio data."""
"""
Clears the internal audio queue, discarding all buffered audio data.

This method immediately removes all audio data currently queued for playback,
effectively resetting the audio source's buffer. Any audio frames that have been
captured but not yet played will be discarded. This is useful in scenarios where
you need to stop playback abruptly or prevent outdated audio data from being played.
"""
req = proto_ffi.FfiRequest()
req.clear_audio_buffer.source_handle = self._ffi_handle.handle
_ = FfiClient.instance.request(req)
self._release_waiter()

async def capture_frame(self, frame: AudioFrame) -> None:
"""Captures an AudioFrame.
"""
Captures an `AudioFrame` and queues it for playback.

Used to push new audio data into the published Track. Audio data will
be pushed in chunks of 10ms. It'll return only when all of the data in
the buffer has been pushed.
This method is used to push new audio data into the audio source. The audio data
will be processed and queued. If the size of the audio frame exceeds the internal
queue size, the method will wait until there is enough space in the queue to
accommodate the frame. The method returns only when all of the data in the buffer
has been pushed.

Args:
frame (AudioFrame): The audio frame to capture and queue.

Raises:
Exception: If there is an error during frame capture.
"""

now = time.monotonic()
Expand Down Expand Up @@ -123,7 +152,13 @@ async def capture_frame(self, frame: AudioFrame) -> None:
raise Exception(cb.capture_audio_frame.error)

async def wait_for_playout(self) -> None:
"""Waits for the audio source to finish playing out all audio data."""
"""
Waits for the audio source to finish playing out all audio data.

This method ensures that all queued audio data has been played out before returning.
It can be used to synchronize events after audio playback or to ensure that the
audio queue is empty.
"""

if self._join_fut is None:
return
Expand Down
88 changes: 87 additions & 1 deletion livekit-rtc/livekit/rtc/audio_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,22 @@

@dataclass
class AudioFrameEvent:
"""An event representing a received audio frame.

Attributes:
frame (AudioFrame): The received audio frame.
"""

frame: AudioFrame


class AudioStream:
"""AudioStream is a stream of audio frames received from a RemoteTrack."""
"""An asynchronous audio stream for receiving audio frames from a participant or track.

The `AudioStream` class provides an asynchronous iterator over audio frames received from
a specific track or participant. It allows you to receive audio frames in real-time with
customizable sample rates and channel configurations.
"""

def __init__(
self,
Expand All @@ -45,6 +56,32 @@ def __init__(
num_channels: int = 1,
**kwargs,
) -> None:
"""Initialize an `AudioStream` instance.

Args:
track (Optional[Track]): The audio track from which to receive audio. If not provided,
you must specify `participant` and `track_source` in `kwargs`.
loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use.
Defaults to the current event loop.
capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate (int, optional): The sample rate for the audio stream in Hz.
Defaults to 48000.
num_channels (int, optional): The number of audio channels. Defaults to 1.
Example:
```python
audio_stream = AudioStream(
track=audio_track,
sample_rate=44100,
num_channels=2,
)

audio_stream = AudioStream.from_track(
track=audio_track,
sample_rate=44100,
num_channels=2,
)
```
"""
self._track: Track | None = track
self._sample_rate = sample_rate
self._num_channels = num_channels
Expand Down Expand Up @@ -76,6 +113,29 @@ def from_participant(
sample_rate: int = 48000,
num_channels: int = 1,
) -> AudioStream:
"""Create an `AudioStream` from a participant's audio track.

Args:
participant (Participant): The participant from whom to receive audio.
track_source (TrackSource.ValueType): The source of the audio track (e.g., microphone, screen share).
loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use. Defaults to the current event loop.
capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels (int, optional): The number of audio channels. Defaults to 1.

Returns:
AudioStream: An instance of `AudioStream` that can be used to receive audio frames.

Example:
```python
audio_stream = AudioStream.from_participant(
participant=participant,
track_source=TrackSource.MICROPHONE,
sample_rate=24000,
num_channels=1,
)
```
"""
return AudioStream(
participant=participant,
track_source=track_source,
Expand All @@ -96,6 +156,27 @@ def from_track(
sample_rate: int = 48000,
num_channels: int = 1,
) -> AudioStream:
"""Create an `AudioStream` from an existing audio track.

Args:
track (Track): The audio track from which to receive audio.
loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use. Defaults to the current event loop.
capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels (int, optional): The number of audio channels. Defaults to 1.

Returns:
AudioStream: An instance of `AudioStream` that can be used to receive audio frames.

Example:
```python
audio_stream = AudioStream.from_track(
track=audio_track,
sample_rate=44100,
num_channels=2,
)
```
"""
return AudioStream(
track=track,
loop=loop,
Expand Down Expand Up @@ -152,6 +233,11 @@ async def _run(self):
FfiClient.instance.queue.unsubscribe(self._ffi_queue)

async def aclose(self) -> None:
"""Asynchronously close the audio stream.

This method cleans up resources associated with the audio stream and waits for
any pending operations to complete.
"""
self._ffi_handle.dispose()
await self._task

Expand Down
Loading
Loading