diff --git a/livekit-rtc/livekit/rtc/__init__.py b/livekit-rtc/livekit/rtc/__init__.py index 4f101fac..9060631f 100644 --- a/livekit-rtc/livekit/rtc/__init__.py +++ b/livekit-rtc/livekit/rtc/__init__.py @@ -70,6 +70,7 @@ from .video_source import VideoSource from .video_stream import VideoFrameEvent, VideoStream from .audio_resampler import AudioResampler, AudioResamplerQuality +from .utils import combine_audio_frames __all__ = [ "ConnectionQuality", @@ -130,5 +131,6 @@ "ChatMessage", "AudioResampler", "AudioResamplerQuality", + "combine_audio_frames", "__version__", ] diff --git a/livekit-rtc/livekit/rtc/audio_source.py b/livekit-rtc/livekit/rtc/audio_source.py index 5ac063b8..bb287c7f 100644 --- a/livekit-rtc/livekit/rtc/audio_source.py +++ b/livekit-rtc/livekit/rtc/audio_source.py @@ -24,6 +24,15 @@ class AudioSource: + """ + Represents a real-time audio source with an internal audio queue. + + The `AudioSource` class allows you to push audio frames into a real-time audio + source, managing an internal queue of audio data up to a maximum duration defined + by `queue_size_ms`. It supports asynchronous operations to capture audio frames + and to wait for the playback of all queued audio data. + """ + def __init__( self, sample_rate: int, @@ -35,11 +44,12 @@ def __init__( Initializes a new instance of the audio source. Args: - sample_rate (int): The sample rate of the audio source in Hz - num_channels (int): The number of audio channels + sample_rate (int): The sample rate of the audio source in Hz. + num_channels (int): The number of audio channels. queue_size_ms (int, optional): The buffer size of the audio queue in milliseconds. Defaults to 1000 ms. - loop (asyncio.AbstractEventLoop, optional): The event loop to use. Defaults to asyncio.get_event_loop(). + loop (asyncio.AbstractEventLoop, optional): The event loop to use. Defaults to + `asyncio.get_event_loop()`. """ self._sample_rate = sample_rate self._num_channels = num_channels @@ -63,29 +73,48 @@ def __init__( @property def sample_rate(self) -> int: + """The sample rate of the audio source in Hz.""" return self._sample_rate @property def num_channels(self) -> int: + """The number of audio channels.""" return self._num_channels @property def queued_duration(self) -> float: + """The current duration (in seconds) of audio data queued for playback.""" return max(self._q_size - time.monotonic() + self._last_capture, 0.0) def clear_queue(self) -> None: - """Clears the audio queue, discarding all buffered audio data.""" + """ + Clears the internal audio queue, discarding all buffered audio data. + + This method immediately removes all audio data currently queued for playback, + effectively resetting the audio source's buffer. Any audio frames that have been + captured but not yet played will be discarded. This is useful in scenarios where + you need to stop playback abruptly or prevent outdated audio data from being played. + """ req = proto_ffi.FfiRequest() req.clear_audio_buffer.source_handle = self._ffi_handle.handle _ = FfiClient.instance.request(req) self._release_waiter() async def capture_frame(self, frame: AudioFrame) -> None: - """Captures an AudioFrame. + """ + Captures an `AudioFrame` and queues it for playback. - Used to push new audio data into the published Track. Audio data will - be pushed in chunks of 10ms. It'll return only when all of the data in - the buffer has been pushed. + This method is used to push new audio data into the audio source. The audio data + will be processed and queued. If the size of the audio frame exceeds the internal + queue size, the method will wait until there is enough space in the queue to + accommodate the frame. The method returns only when all of the data in the buffer + has been pushed. + + Args: + frame (AudioFrame): The audio frame to capture and queue. + + Raises: + Exception: If there is an error during frame capture. """ now = time.monotonic() @@ -123,7 +152,13 @@ async def capture_frame(self, frame: AudioFrame) -> None: raise Exception(cb.capture_audio_frame.error) async def wait_for_playout(self) -> None: - """Waits for the audio source to finish playing out all audio data.""" + """ + Waits for the audio source to finish playing out all audio data. + + This method ensures that all queued audio data has been played out before returning. + It can be used to synchronize events after audio playback or to ensure that the + audio queue is empty. + """ if self._join_fut is None: return diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index ce64eb99..fde373de 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -30,11 +30,22 @@ @dataclass class AudioFrameEvent: + """An event representing a received audio frame. + + Attributes: + frame (AudioFrame): The received audio frame. + """ + frame: AudioFrame class AudioStream: - """AudioStream is a stream of audio frames received from a RemoteTrack.""" + """An asynchronous audio stream for receiving audio frames from a participant or track. + + The `AudioStream` class provides an asynchronous iterator over audio frames received from + a specific track or participant. It allows you to receive audio frames in real-time with + customizable sample rates and channel configurations. + """ def __init__( self, @@ -45,6 +56,32 @@ def __init__( num_channels: int = 1, **kwargs, ) -> None: + """Initialize an `AudioStream` instance. + + Args: + track (Optional[Track]): The audio track from which to receive audio. If not provided, + you must specify `participant` and `track_source` in `kwargs`. + loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use. + Defaults to the current event loop. + capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded). + sample_rate (int, optional): The sample rate for the audio stream in Hz. + Defaults to 48000. + num_channels (int, optional): The number of audio channels. Defaults to 1. + Example: + ```python + audio_stream = AudioStream( + track=audio_track, + sample_rate=44100, + num_channels=2, + ) + + audio_stream = AudioStream.from_track( + track=audio_track, + sample_rate=44100, + num_channels=2, + ) + ``` + """ self._track: Track | None = track self._sample_rate = sample_rate self._num_channels = num_channels @@ -76,6 +113,29 @@ def from_participant( sample_rate: int = 48000, num_channels: int = 1, ) -> AudioStream: + """Create an `AudioStream` from a participant's audio track. + + Args: + participant (Participant): The participant from whom to receive audio. + track_source (TrackSource.ValueType): The source of the audio track (e.g., microphone, screen share). + loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use. Defaults to the current event loop. + capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded). + sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000. + num_channels (int, optional): The number of audio channels. Defaults to 1. + + Returns: + AudioStream: An instance of `AudioStream` that can be used to receive audio frames. + + Example: + ```python + audio_stream = AudioStream.from_participant( + participant=participant, + track_source=TrackSource.MICROPHONE, + sample_rate=24000, + num_channels=1, + ) + ``` + """ return AudioStream( participant=participant, track_source=track_source, @@ -96,6 +156,27 @@ def from_track( sample_rate: int = 48000, num_channels: int = 1, ) -> AudioStream: + """Create an `AudioStream` from an existing audio track. + + Args: + track (Track): The audio track from which to receive audio. + loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use. Defaults to the current event loop. + capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded). + sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000. + num_channels (int, optional): The number of audio channels. Defaults to 1. + + Returns: + AudioStream: An instance of `AudioStream` that can be used to receive audio frames. + + Example: + ```python + audio_stream = AudioStream.from_track( + track=audio_track, + sample_rate=44100, + num_channels=2, + ) + ``` + """ return AudioStream( track=track, loop=loop, @@ -152,6 +233,11 @@ async def _run(self): FfiClient.instance.queue.unsubscribe(self._ffi_queue) async def aclose(self) -> None: + """Asynchronously close the audio stream. + + This method cleans up resources associated with the audio stream and waits for + any pending operations to complete. + """ self._ffi_handle.dispose() await self._task diff --git a/livekit-rtc/livekit/rtc/e2ee.py b/livekit-rtc/livekit/rtc/e2ee.py index 3b296bd0..58cf7e3b 100644 --- a/livekit-rtc/livekit/rtc/e2ee.py +++ b/livekit-rtc/livekit/rtc/e2ee.py @@ -48,6 +48,17 @@ def options(self) -> KeyProviderOptions: return self._options def set_shared_key(self, key: bytes, key_index: int) -> None: + """Sets the shared encryption key. + + Parameters: + key (bytes): The new shared key. + key_index (int): The index of the key. + + Example: + ```python + key_provider.set_shared_key(b"my_shared_key", key_index=1) + ``` + """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.set_shared_key.key_index = key_index @@ -55,6 +66,19 @@ def set_shared_key(self, key: bytes, key_index: int) -> None: FfiClient.instance.request(req) def export_shared_key(self, key_index: int) -> bytes: + """Exports the shared encryption key. + + Parameters: + key_index (int): The index of the key to export. + + Returns: + bytes: The exported shared key. + + Example: + ```python + key = key_provider.export_shared_key(key_index=1) + ``` + """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.get_shared_key.key_index = key_index @@ -63,6 +87,19 @@ def export_shared_key(self, key_index: int) -> bytes: return key def ratchet_shared_key(self, key_index: int) -> bytes: + """Ratchets the shared encryption key to a new key. + + Parameters: + key_index (int): The index of the key to ratchet. + + Returns: + bytes: The new ratcheted shared key. + + Example: + ```python + new_key = key_provider.ratchet_shared_key(key_index=1) + ``` + """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.ratchet_shared_key.key_index = key_index @@ -73,6 +110,18 @@ def ratchet_shared_key(self, key_index: int) -> bytes: return new_key def set_key(self, participant_identity: str, key: bytes, key_index: int) -> None: + """Sets the encryption key for a specific participant. + + Parameters: + participant_identity (str): The identity of the participant. + key (bytes): The encryption key to set. + key_index (int): The index of the key. + + Example: + ```python + key_provider.set_key("participant123", b"participant_key", key_index=2) + ``` + """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.set_key.participant_identity = participant_identity @@ -83,6 +132,20 @@ def set_key(self, participant_identity: str, key: bytes, key_index: int) -> None FfiClient.instance.request(req) def export_key(self, participant_identity: str, key_index: int) -> bytes: + """Exports the encryption key for a specific participant. + + Parameters: + participant_identity (str): The identity of the participant. + key_index (int): The index of the key to export. + + Returns: + bytes: The exported key. + + Example: + ```python + key = key_provider.export_key("participant123", key_index=2) + ``` + """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.get_key.participant_identity = participant_identity @@ -92,6 +155,20 @@ def export_key(self, participant_identity: str, key_index: int) -> bytes: return key def ratchet_key(self, participant_identity: str, key_index: int) -> bytes: + """Ratchets the encryption key for a specific participant to a new key. + + Parameters: + participant_identity (str): The identity of the participant. + key_index (int): The index of the key to ratchet. + + Returns: + bytes: The new ratcheted key. + + Example: + ```python + new_key = key_provider.ratchet_key("participant123", key_index=2) + ``` + """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.ratchet_key.participant_identity = participant_identity @@ -124,6 +201,16 @@ def enabled(self) -> bool: return self._enabled def set_enabled(self, enabled: bool) -> None: + """Enables or disables frame encryption. + + Parameters: + enabled (bool): True to enable, False to disable. + + Example: + ```python + frame_cryptor.set_enabled(True) + ``` + """ self._enabled = enabled req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle @@ -132,6 +219,16 @@ def set_enabled(self, enabled: bool) -> None: FfiClient.instance.request(req) def set_key_index(self, key_index: int) -> None: + """Sets the key index for encryption/decryption. + + Parameters: + key_index (int): The new key index. + + Example: + ```python + frame_cryptor.set_key_index(3) + ``` + """ self._key_index = key_index req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle @@ -160,6 +257,16 @@ def enabled(self) -> bool: return self._enabled def set_enabled(self, enabled: bool) -> None: + """Enables or disables end-to-end encryption. + + Parameters: + enabled (bool): True to enable, False to disable. + + Example: + ```python + e2ee_manager.set_enabled(True) + ``` + """ self._enabled = enabled req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle @@ -167,6 +274,18 @@ def set_enabled(self, enabled: bool) -> None: FfiClient.instance.request(req) def frame_cryptors(self) -> List[FrameCryptor]: + """Retrieves the list of frame cryptors for participants. + + Returns: + List[FrameCryptor]: A list of FrameCryptor instances. + + Example: + ```python + cryptors = e2ee_manager.frame_cryptors() + for cryptor in cryptors: + print(cryptor.participant_identity) + ``` + """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle diff --git a/livekit-rtc/livekit/rtc/participant.py b/livekit-rtc/livekit/rtc/participant.py index 49be9395..71b6eb17 100644 --- a/livekit-rtc/livekit/rtc/participant.py +++ b/livekit-rtc/livekit/rtc/participant.py @@ -80,14 +80,18 @@ def metadata(self) -> str: @property def attributes(self) -> dict[str, str]: + """Custom attributes associated with the participant.""" return dict(self._info.attributes) @property def kind(self) -> proto_participant.ParticipantKind.ValueType: + """Participant's kind (e.g., regular participant, ingress, egress, sip, agent).""" return self._info.kind class LocalParticipant(Participant): + """Represents the local participant in a room.""" + def __init__( self, room_queue: BroadcastQueue[proto_ffi.FfiEvent], @@ -105,6 +109,18 @@ async def publish_data( destination_identities: List[str] = [], topic: str = "", ) -> None: + """ + Publish arbitrary data to the room. + + Args: + payload (Union[bytes, str]): The data to publish. + reliable (bool, optional): Whether to send reliably or not. Defaults to True. + destination_identities (List[str], optional): List of participant identities to send to. Defaults to []. + topic (str, optional): The topic under which to publish the data. Defaults to "". + + Raises: + PublishDataError: If there is an error in publishing data. + """ if isinstance(payload, str): payload = payload.encode("utf-8") @@ -132,6 +148,15 @@ async def publish_data( raise PublishDataError(cb.publish_data.error) async def publish_transcription(self, transcription: Transcription) -> None: + """ + Publish transcription data to the room. + + Args: + transcription (Transcription): The transcription data to publish. + + Raises: + PublishTranscriptionError: If there is an error in publishing transcription. + """ req = proto_ffi.FfiRequest() proto_segments = [ ProtoTranscriptionSegment( @@ -164,6 +189,14 @@ async def publish_transcription(self, transcription: Transcription) -> None: raise PublishTranscriptionError(cb.publish_transcription.error) async def set_metadata(self, metadata: str) -> None: + """ + Set the metadata for the local participant. + + Note: this requires `canUpdateOwnMetadata` permission. + + Args: + metadata (str): The new metadata. + """ req = proto_ffi.FfiRequest() req.set_local_metadata.local_participant_handle = self._ffi_handle.handle req.set_local_metadata.metadata = metadata @@ -179,6 +212,14 @@ async def set_metadata(self, metadata: str) -> None: FfiClient.instance.queue.unsubscribe(queue) async def set_name(self, name: str) -> None: + """ + Set the name for the local participant. + + Note: this requires `canUpdateOwnMetadata` permission. + + Args: + name (str): The new name. + """ req = proto_ffi.FfiRequest() req.set_local_name.local_participant_handle = self._ffi_handle.handle req.set_local_name.name = name @@ -193,6 +234,14 @@ async def set_name(self, name: str) -> None: FfiClient.instance.queue.unsubscribe(queue) async def set_attributes(self, attributes: dict[str, str]) -> None: + """ + Set custom attributes for the local participant. + + Note: this requires `canUpdateOwnMetadata` permission. + + Args: + attributes (dict[str, str]): A dictionary of attributes to set. + """ req = proto_ffi.FfiRequest() req.set_local_attributes.local_participant_handle = self._ffi_handle.handle req.set_local_attributes.attributes.update(attributes) @@ -210,6 +259,19 @@ async def set_attributes(self, attributes: dict[str, str]) -> None: async def publish_track( self, track: LocalTrack, options: TrackPublishOptions = TrackPublishOptions() ) -> LocalTrackPublication: + """ + Publish a local track to the room. + + Args: + track (LocalTrack): The track to publish. + options (TrackPublishOptions, optional): Options for publishing the track. + + Returns: + LocalTrackPublication: The publication of the published track. + + Raises: + PublishTrackError: If there is an error in publishing the track. + """ req = proto_ffi.FfiRequest() req.publish_track.track_handle = track._ffi_handle.handle req.publish_track.local_participant_handle = self._ffi_handle.handle @@ -236,6 +298,15 @@ async def publish_track( self._room_queue.unsubscribe(queue) async def unpublish_track(self, track_sid: str) -> None: + """ + Unpublish a track from the room. + + Args: + track_sid (str): The SID of the track to unpublish. + + Raises: + UnpublishTrackError: If there is an error in unpublishing the track. + """ req = proto_ffi.FfiRequest() req.unpublish_track.local_participant_handle = self._ffi_handle.handle req.unpublish_track.track_sid = track_sid diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index 82fd2958..64a157cb 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -17,7 +17,7 @@ import ctypes import logging from dataclasses import dataclass, field -from typing import Dict, Literal, Optional, cast +from typing import Callable, Dict, Literal, Optional, cast from ._event_emitter import EventEmitter from ._ffi_client import FfiClient, FfiHandle @@ -69,37 +69,47 @@ class RtcConfiguration: ice_transport_type: proto_room.IceTransportType.ValueType = ( proto_room.IceTransportType.TRANSPORT_ALL ) + """Specifies the type of ICE transport to be used (e.g., all, relay, etc.).""" continual_gathering_policy: proto_room.ContinualGatheringPolicy.ValueType = ( proto_room.ContinualGatheringPolicy.GATHER_CONTINUALLY ) + """Policy for continual gathering of ICE candidates.""" ice_servers: list[proto_room.IceServer] = field(default_factory=list) + """List of ICE servers for STUN/TURN. When empty, it uses the default ICE servers provided by + the SFU.""" @dataclass class RoomOptions: auto_subscribe: bool = True + """Automatically subscribe to tracks when participants join.""" dynacast: bool = False - e2ee: Optional[E2EEOptions] = None - rtc_config: Optional[RtcConfiguration] = None + e2ee: E2EEOptions | None = None + """Options for end-to-end encryption.""" + rtc_config: RtcConfiguration | None = None + """WebRTC-related configuration.""" @dataclass class DataPacket: data: bytes + """The payload of the data packet.""" kind: proto_room.DataPacketKind.ValueType - participant: Optional[RemoteParticipant] = ( - None # None when the data has been sent by a server SDK - ) - topic: Optional[str] = None + """Type of the data packet (e.g., RELIABLE, LOSSY).""" + participant: RemoteParticipant | None + """Participant who sent the data. None when sent by a server SDK.""" + topic: str | None = None + """Topic associated with the data packet.""" @dataclass class SipDTMF: code: int + """DTMF code corresponding to the digit.""" digit: str - participant: Optional[RemoteParticipant] = ( - None # None when the data has been sent by a server SDK - ) + """DTMF digit sent.""" + participant: RemoteParticipant | None = None + """Participant who sent the DTMF digit. None when sent by a server SDK.""" class ConnectError(Exception): @@ -109,6 +119,11 @@ def __init__(self, message: str): class Room(EventEmitter[EventTypes]): def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> None: + """Initializes a new Room instance. + + Parameters: + loop (Optional[asyncio.AbstractEventLoop]): The event loop to use. If not provided, the default event loop is used. + """ super().__init__() self._ffi_handle: Optional[FfiHandle] = None @@ -126,6 +141,11 @@ def __del__(self) -> None: @property async def sid(self) -> str: + """Asynchronously retrieves the session ID (SID) of the room. + + Returns: + str: The session ID of the room. + """ if self._info.sid: return self._info.sid @@ -133,25 +153,143 @@ async def sid(self) -> str: @property def name(self) -> str: + """Gets the name of the room. + + Returns: + str: The name of the room. + """ return self._info.name @property def metadata(self) -> str: + """Gets the metadata associated with the room. + + Returns: + str: The metadata of the room. + """ return self._info.metadata @property def e2ee_manager(self) -> E2EEManager: + """Gets the end-to-end encryption (E2EE) manager for the room. + + Returns: + E2EEManager: The E2EE manager instance. + """ return self._e2ee_manager def isconnected(self) -> bool: + """Checks if the room is currently connected. + + Returns: + bool: True if connected, False otherwise. + """ return ( self._ffi_handle is not None and self.connection_state != ConnectionState.CONN_DISCONNECTED ) + def on(self, event: EventTypes, callback: Optional[Callable] = None) -> Callable: + """Registers an event handler for a specific event type. + + Parameters: + event (EventTypes): The name of the event to listen for. + callback (Callable): The function to call when the event occurs. + + Returns: + Callable: The registered callback function. + + Available events: + - **"participant_connected"**: Called when a new participant joins the room. + - Arguments: `participant` (RemoteParticipant) + - **"participant_disconnected"**: Called when a participant leaves the room. + - Arguments: `participant` (RemoteParticipant) + - **"local_track_published"**: Called when a local track is published. + - Arguments: `publication` (LocalTrackPublication), `track` (Track) + - **"local_track_unpublished"**: Called when a local track is unpublished. + - Arguments: `publication` (LocalTrackPublication) + - **"local_track_subscribed"**: Called when a local track is subscribed. + - Arguments: `track` (Track) + - **"track_published"**: Called when a remote participant publishes a track. + - Arguments: `publication` (RemoteTrackPublication), `participant` (RemoteParticipant) + - **"track_unpublished"**: Called when a remote participant unpublishes a track. + - Arguments: `publication` (RemoteTrackPublication), `participant` (RemoteParticipant) + - **"track_subscribed"**: Called when a track is subscribed. + - Arguments: `track` (Track), `publication` (RemoteTrackPublication), `participant` (RemoteParticipant) + - **"track_unsubscribed"**: Called when a track is unsubscribed. + - Arguments: `track` (Track), `publication` (RemoteTrackPublication), `participant` (RemoteParticipant) + - **"track_subscription_failed"**: Called when a track subscription fails. + - Arguments: `participant` (RemoteParticipant), `track_sid` (str), `error` (str) + - **"track_muted"**: Called when a track is muted. + - Arguments: `participant` (Participant), `publication` (TrackPublication) + - **"track_unmuted"**: Called when a track is unmuted. + - Arguments: `participant` (Participant), `publication` (TrackPublication) + - **"active_speakers_changed"**: Called when the list of active speakers changes. + - Arguments: `speakers` (list[Participant]) + - **"room_metadata_changed"**: Called when the room's metadata is updated. + - Arguments: `old_metadata` (str), `new_metadata` (str) + - **"participant_metadata_changed"**: Called when a participant's metadata is updated. + - Arguments: `participant` (Participant), `old_metadata` (str), `new_metadata` (str) + - **"participant_name_changed"**: Called when a participant's name is changed. + - Arguments: `participant` (Participant), `old_name` (str), `new_name` (str) + - **"participant_attributes_changed"**: Called when a participant's attributes change. + - Arguments: `changed_attributes` (dict), `participant` (Participant) + - **"connection_quality_changed"**: Called when a participant's connection quality changes. + - Arguments: `participant` (Participant), `quality` (ConnectionQuality) + - **"transcription_received"**: Called when a transcription is received. + - Arguments: `segments` (list[TranscriptionSegment]), `participant` (Participant), `publication` (TrackPublication) + - **"data_received"**: Called when data is received. + - Arguments: `data_packet` (DataPacket) + - **"sip_dtmf_received"**: Called when a SIP DTMF signal is received. + - Arguments: `sip_dtmf` (SipDTMF) + - **"e2ee_state_changed"**: Called when a participant's E2EE state changes. + - Arguments: `participant` (Participant), `state` (EncryptionState) + - **"connection_state_changed"**: Called when the room's connection state changes. + - Arguments: `connection_state` (ConnectionState) + - **"connected"**: Called when the room is successfully connected. + - Arguments: None + - **"disconnected"**: Called when the room is disconnected. + - Arguments: `reason` (DisconnectReason) + - **"reconnecting"**: Called when the room is attempting to reconnect. + - Arguments: None + - **"reconnected"**: Called when the room has successfully reconnected. + - Arguments: None + + Example: + ```python + def on_participant_connected(participant): + print(f"Participant connected: {participant.identity}") + + room.on("participant_connected", on_participant_connected) + ``` + """ + return super().on(event, callback) + async def connect( self, url: str, token: str, options: RoomOptions = RoomOptions() ) -> None: + """Connects to a LiveKit room using the specified URL and token. + + Parameters: + url (str): The WebSocket URL of the LiveKit server to connect to. + token (str): The access token for authentication and authorization. + options (RoomOptions, optional): Additional options for the room connection. + + Raises: + ConnectError: If the connection fails. + + Example: + ```python + room = Room() + + # Listen for events before connecting to the room + @room.on("participant_connected") + def on_participant_connected(participant): + print(f"Participant connected: {participant.identity}") + + await room.connect("ws://localhost:7880", "your_token") + ``` + """ req = proto_ffi.FfiRequest() req.connect.url = url req.connect.token = token @@ -225,6 +363,7 @@ async def connect( self._task = self._loop.create_task(self._listen_task()) async def disconnect(self) -> None: + """Disconnects from the room.""" if not self.isconnected(): return diff --git a/livekit-rtc/livekit/rtc/utils.py b/livekit-rtc/livekit/rtc/utils.py new file mode 100644 index 00000000..b4371966 --- /dev/null +++ b/livekit-rtc/livekit/rtc/utils.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from .audio_frame import AudioFrame + + +__all__ = ["combine_audio_frames"] + + +def combine_audio_frames(buffer: AudioFrame | list[AudioFrame]) -> AudioFrame: + """ + Combines one or more `rtc.AudioFrame` objects into a single `rtc.AudioFrame`. + + This function concatenates the audio data from multiple frames, ensuring that + all frames have the same sample rate and number of channels. It efficiently + merges the data by preallocating the necessary memory and copying the frame + data without unnecessary reallocations. + + Args: + buffer: A single `rtc.AudioFrame` or a list of `rtc.AudioFrame` + objects to be combined. + + Returns: + rtc.AudioFrame: A new `rtc.AudioFrame` containing the combined audio data. + + Raises: + ValueError: If the buffer is empty. + ValueError: If frames have differing sample rates. + ValueError: If frames have differing numbers of channels. + + Example: + >>> frame1 = rtc.AudioFrame( + ... data=b"\x01\x02", sample_rate=48000, num_channels=2, samples_per_channel=1 + ... ) + >>> frame2 = rtc.AudioFrame( + ... data=b"\x03\x04", sample_rate=48000, num_channels=2, samples_per_channel=1 + ... ) + >>> combined_frame = combine_audio_frames([frame1, frame2]) + >>> combined_frame.data + b'\x01\x02\x03\x04' + >>> combined_frame.sample_rate + 48000 + >>> combined_frame.num_channels + 2 + >>> combined_frame.samples_per_channel + 2 + """ + if not isinstance(buffer, list): + return buffer + + if not buffer: + raise ValueError("buffer is empty") + + sample_rate = buffer[0].sample_rate + num_channels = buffer[0].num_channels + + total_data_length = 0 + total_samples_per_channel = 0 + + for frame in buffer: + if frame.sample_rate != sample_rate: + raise ValueError( + f"Sample rate mismatch: expected {sample_rate}, got {frame.sample_rate}" + ) + + if frame.num_channels != num_channels: + raise ValueError( + f"Channel count mismatch: expected {num_channels}, got {frame.num_channels}" + ) + + total_data_length += len(frame.data) + total_samples_per_channel += frame.samples_per_channel + + data = bytearray(total_data_length) + offset = 0 + for frame in buffer: + frame_data = frame.data.cast("b") + data[offset : offset + len(frame_data)] = frame_data + offset += len(frame_data) + + return AudioFrame( + data=data, + sample_rate=sample_rate, + num_channels=num_channels, + samples_per_channel=total_samples_per_channel, + )