diff --git a/livekit-rtc/livekit/rtc/__init__.py b/livekit-rtc/livekit/rtc/__init__.py index 9060631f..4bac0f6f 100644 --- a/livekit-rtc/livekit/rtc/__init__.py +++ b/livekit-rtc/livekit/rtc/__init__.py @@ -57,6 +57,7 @@ Track, VideoTrack, ) +from .event_emitter import EventEmitter from .track_publication import ( LocalTrackPublication, RemoteTrackPublication, @@ -131,6 +132,7 @@ "ChatMessage", "AudioResampler", "AudioResamplerQuality", + "EventEmitter", "combine_audio_frames", "__version__", ] diff --git a/livekit-rtc/livekit/rtc/_event_emitter.py b/livekit-rtc/livekit/rtc/_event_emitter.py deleted file mode 100644 index 3c534bbe..00000000 --- a/livekit-rtc/livekit/rtc/_event_emitter.py +++ /dev/null @@ -1,48 +0,0 @@ -from typing import Callable, Dict, Set, Optional, Generic, TypeVar - -T = TypeVar("T") - - -class EventEmitter(Generic[T]): - def __init__(self) -> None: - self._events: Dict[T, Set[Callable]] = dict() - - def emit(self, event: T, *args, **kwargs) -> None: - if event in self._events: - callables = self._events[event].copy() - for callback in callables: - callback(*args, **kwargs) - - def once(self, event: T, callback: Optional[Callable] = None) -> Callable: - if callback is not None: - - def once_callback(*args, **kwargs): - self.off(event, once_callback) - callback(*args, **kwargs) - - return self.on(event, once_callback) - else: - - def decorator(callback: Callable) -> Callable: - self.once(event, callback) - return callback - - return decorator - - def on(self, event: T, callback: Optional[Callable] = None) -> Callable: - if callback is not None: - if event not in self._events: - self._events[event] = set() - self._events[event].add(callback) - return callback - else: - - def decorator(callback: Callable) -> Callable: - self.on(event, callback) - return callback - - return decorator - - def off(self, event: T, callback: Callable) -> None: - if event in self._events: - self._events[event].remove(callback) diff --git a/livekit-rtc/livekit/rtc/_ffi_client.py b/livekit-rtc/livekit/rtc/_ffi_client.py index 57a9ae0a..146ce8c3 100644 --- a/livekit-rtc/livekit/rtc/_ffi_client.py +++ b/livekit-rtc/livekit/rtc/_ffi_client.py @@ -27,8 +27,7 @@ from ._proto import ffi_pb2 as proto_ffi from ._utils import Queue, classproperty - -logger = logging.getLogger("livekit") +from .log import logger _resource_files = ExitStack() atexit.register(_resource_files.close) diff --git a/livekit-rtc/livekit/rtc/chat.py b/livekit-rtc/livekit/rtc/chat.py index 032fdd2a..22e4b0c8 100644 --- a/livekit-rtc/livekit/rtc/chat.py +++ b/livekit-rtc/livekit/rtc/chat.py @@ -19,7 +19,7 @@ from typing import Any, Dict, Literal, Optional from .room import Room, Participant, DataPacket -from ._event_emitter import EventEmitter +from .event_emitter import EventEmitter from ._utils import generate_random_base62 _CHAT_TOPIC = "lk-chat-topic" diff --git a/livekit-rtc/livekit/rtc/event_emitter.py b/livekit-rtc/livekit/rtc/event_emitter.py new file mode 100644 index 00000000..edb17cba --- /dev/null +++ b/livekit-rtc/livekit/rtc/event_emitter.py @@ -0,0 +1,192 @@ +import inspect +from typing import Callable, Dict, Set, Optional, Generic, TypeVar + +from .log import logger + +T = TypeVar("T") + + +class EventEmitter(Generic[T]): + def __init__(self) -> None: + """ + Initialize a new instance of EventEmitter. + """ + self._events: Dict[T, Set[Callable]] = dict() + + def emit(self, event: T, *args) -> None: + """ + Trigger all callbacks associated with the given event. + + Args: + event (T): The event to emit. + *args: Positional arguments to pass to the callbacks. + + Example: + Basic usage of emit: + + ```python + emitter = EventEmitter[str]() + + def greet(name): + print(f"Hello, {name}!") + + emitter.on('greet', greet) + emitter.emit('greet', 'Alice') # Output: Hello, Alice! + ``` + """ + if event in self._events: + callables = self._events[event].copy() + for callback in callables: + try: + sig = inspect.signature(callback) + params = sig.parameters.values() + + has_varargs = any(p.kind == p.VAR_POSITIONAL for p in params) + if has_varargs: + callback(*args) + else: + positional_params = [ + p + for p in params + if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) + ] + num_params = len(positional_params) + num_args = min(len(args), num_params) + callback_args = args[:num_args] + + callback(*callback_args) + except Exception: + logger.exception(f"failed to emit event {event}") + + def once(self, event: T, callback: Optional[Callable] = None) -> Callable: + """ + Register a callback to be called only once when the event is emitted. + + If a callback is provided, it registers the callback directly. + If no callback is provided, it returns a decorator for use with function definitions. + + Args: + event (T): The event to listen for. + callback (Callable, optional): The callback to register. Defaults to None. + + Returns: + Callable: The registered callback or a decorator if callback is None. + + Example: + Using once with a direct callback: + + ```python + emitter = EventEmitter[str]() + + def greet_once(name): + print(f"Hello once, {name}!") + + emitter.once('greet', greet_once) + emitter.emit('greet', 'Bob') # Output: Hello once, Bob! + emitter.emit('greet', 'Bob') # No output, callback was removed after first call + ``` + + Using once as a decorator: + + ```python + emitter = EventEmitter[str]() + + @emitter.once('greet') + def greet_once(name): + print(f"Hello once, {name}!") + + emitter.emit('greet', 'Bob') # Output: Hello once, Bob! + emitter.emit('greet', 'Bob') # No output + ``` + """ + if callback is not None: + + def once_callback(*args, **kwargs): + self.off(event, once_callback) + callback(*args, **kwargs) + + return self.on(event, once_callback) + else: + + def decorator(callback: Callable) -> Callable: + self.once(event, callback) + return callback + + return decorator + + def on(self, event: T, callback: Optional[Callable] = None) -> Callable: + """ + Register a callback to be called whenever the event is emitted. + + If a callback is provided, it registers the callback directly. + If no callback is provided, it returns a decorator for use with function definitions. + + Args: + event (T): The event to listen for. + callback (Callable, optional): The callback to register. Defaults to None. + + Returns: + Callable: The registered callback or a decorator if callback is None. + + Example: + Using on with a direct callback: + + ```python + emitter = EventEmitter[str]() + + def greet(name): + print(f"Hello, {name}!") + + emitter.on('greet', greet) + emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! + ``` + + Using on as a decorator: + + ```python + emitter = EventEmitter[str]() + + @emitter.on('greet') + def greet(name): + print(f"Hello, {name}!") + + emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! + ``` + """ + if callback is not None: + if event not in self._events: + self._events[event] = set() + self._events[event].add(callback) + return callback + else: + + def decorator(callback: Callable) -> Callable: + self.on(event, callback) + return callback + + return decorator + + def off(self, event: T, callback: Callable) -> None: + """ + Unregister a callback from an event. + + Args: + event (T): The event to stop listening to. + callback (Callable): The callback to remove. + + Example: + Removing a callback: + + ```python + emitter = EventEmitter[str]() + + def greet(name): + print(f"Hello, {name}!") + + emitter.on('greet', greet) + emitter.off('greet', greet) + emitter.emit('greet', 'Dave') # No output, callback was removed + ``` + """ + if event in self._events: + self._events[event].remove(callback) diff --git a/livekit-rtc/livekit/rtc/log.py b/livekit-rtc/livekit/rtc/log.py new file mode 100644 index 00000000..72c505f8 --- /dev/null +++ b/livekit-rtc/livekit/rtc/log.py @@ -0,0 +1,3 @@ +import logging + +logger = logging.getLogger("livekit") diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index 41c287a0..deafb828 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -19,7 +19,7 @@ from dataclasses import dataclass, field from typing import Callable, Dict, Literal, Optional, cast -from ._event_emitter import EventEmitter +from .event_emitter import EventEmitter from ._ffi_client import FfiClient, FfiHandle from ._proto import ffi_pb2 as proto_ffi from ._proto import participant_pb2 as proto_participant