diff --git a/examples/video-stream/README.md b/examples/video-stream/README.md new file mode 100644 index 000000000..44d58ae71 --- /dev/null +++ b/examples/video-stream/README.md @@ -0,0 +1,40 @@ +# Video and Audio Synchronization Examples + +This example demonstrates how to synchronize video and audio streams using the `AVSynchronizer` utility. + +## AVSynchronizer Usage + +The `AVSynchronizer` helps maintain synchronization between video and audio frames. The key principle is to push the initial synchronized video and audio frames together. After that, subsequent frames will be automatically synchronized according to the configured video FPS and audio sample rate. + +```python +av_sync = AVSynchronizer( + audio_source=audio_source, + video_source=video_source, + video_fps=30.0, + video_queue_size_ms=100 +) + +# Push frames to synchronizer +await av_sync.push(video_frame) +await av_sync.push(audio_frame) +``` + +## Examples + +### 1. Video File Playback (`video_play.py`) +Shows how to stream video and audio from separate sources while maintaining sync: + +- Reads video and audio streams separately from a media file +- Uses separate tasks to push video and audio frames to the synchronizer +- Since the streams are continuous, a larger `queue_size_ms` can be used, though this will increase memory usage + +### 2. Audio Visualization (`audio_wave.py`) +Demonstrates generating video based on audio input: + +- Generates audio frames with alternating sine waves and silence +- Creates video frames visualizing the audio waveform +- Shows how to handle cases with and without audio: + - When audio is present: Push synchronized video and audio frames + - During silence: Push only video frames +- Since video and audio frames are pushed in the same loop, audio frames must be smaller than the audio source queue size to avoid blocking +- Uses a small `queue_size_ms` (e.g. 50ms) to control frame generation speed during silence periods diff --git a/examples/video-stream/audio_wave.py b/examples/video-stream/audio_wave.py new file mode 100644 index 000000000..758d59050 --- /dev/null +++ b/examples/video-stream/audio_wave.py @@ -0,0 +1,288 @@ +import asyncio +import logging +import time +from collections import deque +from dataclasses import dataclass +from typing import AsyncIterable, Optional, Union + +import numpy as np +from dotenv import load_dotenv +from livekit import rtc +from livekit.agents import JobContext, WorkerOptions, cli +from livekit.agents.utils.av_sync import AVSynchronizer + +try: + import cv2 +except ImportError: + raise RuntimeError( + "cv2 is required to run this example, " + "install with `pip install opencv-python`" + ) + +# Load environment variables +load_dotenv() + +logger = logging.getLogger(__name__) + + +@dataclass +class MediaInfo: + video_width: int + video_height: int + video_fps: float + audio_sample_rate: int + audio_channels: int + + +class _AudioEndSentinel: + pass + + +async def audio_generator( + media_info: MediaInfo, + output_audio: asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]], +): + """Generates audio frames with alternating sine wave and silence periods""" + frequency = 480 # Hz + amplitude = 0.5 + period = 7.0 + sine_duration = 5.0 # Duration of sine wave in each period + chunk_size = 1024 + + while True: + current_time = 0.0 + + # Generate audio for sine_duration seconds + while current_time < sine_duration: + t = np.linspace( + current_time, + current_time + chunk_size / media_info.audio_sample_rate, + num=chunk_size, + endpoint=False, + ) + # Create volume envelope using sine wave + volume = np.abs(np.sin(2 * np.pi * current_time / sine_duration)) + samples = amplitude * volume * np.sin(2 * np.pi * frequency * t) + + # Convert to int16, (samples, channels) + samples = (samples[:, np.newaxis] * 32767).astype(np.int16) + if media_info.audio_channels > 1: + samples = np.repeat(samples, media_info.audio_channels, axis=1) + + # Create audio frame + audio_frame = rtc.AudioFrame( + data=samples.tobytes(), + sample_rate=media_info.audio_sample_rate, + num_channels=samples.shape[1], + samples_per_channel=samples.shape[0], + ) + await output_audio.put(audio_frame) + current_time += chunk_size / media_info.audio_sample_rate + await asyncio.sleep(0) + await output_audio.put(_AudioEndSentinel()) + + # Simulate silence + silence_duration = period - sine_duration + await asyncio.sleep(silence_duration) + + +class WaveformVisualizer: + def __init__(self, history_length: int = 1000): + self.history_length = history_length + self.volume_history: deque[float] = deque(maxlen=history_length) + self.start_time = time.time() + + def draw_timestamp(self, canvas: np.ndarray, fps: float): + height, width = canvas.shape[:2] + text = f"{time.time() - self.start_time:.1f}s @ {fps:.1f}fps" + font_face = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 2.0 + thickness = 2 + + (text_width, text_height), baseline = cv2.getTextSize( + text, font_face, font_scale, thickness + ) + x = (width - text_width) // 2 + y = int((height - text_height) * 0.4 + baseline) + cv2.putText(canvas, text, (x, y), font_face, font_scale, (0, 0, 0), thickness) + + def draw_current_wave( + self, canvas: np.ndarray, audio_samples: np.ndarray + ) -> np.ndarray: + """Draw the current waveform and return the current values""" + height, width = canvas.shape[:2] + center_y = height // 2 + 100 + + normalized_samples = audio_samples.astype(np.float32) / 32767.0 + normalized_samples = normalized_samples.mean(axis=1) # (samples,) + num_points = min(width, len(normalized_samples)) + + if len(normalized_samples) > num_points: + indices = np.linspace(0, len(normalized_samples) - 1, num_points, dtype=int) + plot_data = normalized_samples[indices] + else: + plot_data = normalized_samples + + x_coords = np.linspace(0, width, num_points, dtype=int) + y_coords = (plot_data * 200) + center_y + + cv2.line(canvas, (0, center_y), (width, center_y), (200, 200, 200), 1) + points = np.column_stack((x_coords, y_coords.astype(int))) + for i in range(len(points) - 1): + cv2.line(canvas, tuple(points[i]), tuple(points[i + 1]), (0, 255, 0), 2) + + return plot_data + + def draw_volume_history(self, canvas: np.ndarray, current_volume: float): + height, width = canvas.shape[:2] + center_y = height // 2 + + self.volume_history.append(current_volume) + cv2.line( + canvas, (0, center_y - 250), (width, center_y - 250), (200, 200, 200), 1 + ) + + volume_x = np.linspace(0, width, len(self.volume_history), dtype=int) + volume_y = center_y - 250 + (np.array(self.volume_history) * 200) + points = np.column_stack((volume_x, volume_y.astype(int))) + for i in range(len(points) - 1): + cv2.line(canvas, tuple(points[i]), tuple(points[i + 1]), (255, 0, 0), 2) + + def draw(self, canvas: np.ndarray, audio_samples: np.ndarray, fps: float): + self.draw_timestamp(canvas, fps) + plot_data = self.draw_current_wave(canvas, audio_samples) + current_volume = np.abs(plot_data).mean() + self.draw_volume_history(canvas, current_volume) + + +async def video_generator( + media_info: MediaInfo, + input_audio: asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]], + av_sync: AVSynchronizer, # only used for drawing the actual fps on the video +) -> AsyncIterable[tuple[rtc.VideoFrame, Optional[rtc.AudioFrame]]]: + canvas = np.zeros( + (media_info.video_height, media_info.video_width, 4), dtype=np.uint8 + ) + canvas.fill(255) + + def _np_to_video_frame(image: np.ndarray) -> rtc.VideoFrame: + return rtc.VideoFrame( + width=image.shape[1], + height=image.shape[0], + type=rtc.VideoBufferType.RGBA, + data=image.tobytes(), + ) + + audio_samples_per_frame = int(media_info.audio_sample_rate / media_info.video_fps) + audio_buffer = np.zeros((0, media_info.audio_channels), dtype=np.int16) + wave_visualizer = WaveformVisualizer() + while True: + try: + # timeout has to be shorter than the frame interval to avoid starvation + audio_frame = await asyncio.wait_for( + input_audio.get(), timeout=0.5 / media_info.video_fps + ) + except asyncio.TimeoutError: + # generate frame without audio (e.g. silence state) + new_frame = canvas.copy() + wave_visualizer.draw(new_frame, np.zeros((1, 2)), av_sync.actual_fps) + video_frame = _np_to_video_frame(new_frame) + yield video_frame, None + + # speed is controlled by the video fps in av_sync + await asyncio.sleep(0) + continue + + if isinstance(audio_frame, _AudioEndSentinel): + # drop the audio buffer when the audio finished + audio_buffer = np.zeros((0, media_info.audio_channels), dtype=np.int16) + continue + + audio_samples = np.frombuffer(audio_frame.data, dtype=np.int16).reshape( + -1, audio_frame.num_channels + ) # (samples, channels) + # accumulate audio samples to the buffer + audio_buffer = np.concatenate([audio_buffer, audio_samples], axis=0) + + while audio_buffer.shape[0] >= audio_samples_per_frame: + sub_samples = audio_buffer[:audio_samples_per_frame, :] + audio_buffer = audio_buffer[audio_samples_per_frame:, :] + + new_frame = canvas.copy() + wave_visualizer.draw(new_frame, sub_samples, av_sync.actual_fps) + video_frame = _np_to_video_frame(new_frame) + sub_audio_frame = rtc.AudioFrame( + data=sub_samples.tobytes(), + sample_rate=audio_frame.sample_rate, + num_channels=sub_samples.shape[1], + samples_per_channel=sub_samples.shape[0], + ) + yield video_frame, sub_audio_frame + + +async def entrypoint(job: JobContext): + await job.connect() + room = job.room + + # Create media info + media_info = MediaInfo( + video_width=1280, + video_height=720, + video_fps=30.0, + audio_sample_rate=48000, + audio_channels=2, + ) + + # Create video and audio sources/tracks + queue_size_ms = 50 + video_source = rtc.VideoSource( + width=media_info.video_width, + height=media_info.video_height, + ) + audio_source = rtc.AudioSource( + sample_rate=media_info.audio_sample_rate, + num_channels=media_info.audio_channels, + queue_size_ms=queue_size_ms, + ) + + video_track = rtc.LocalVideoTrack.create_video_track("video", video_source) + audio_track = rtc.LocalAudioTrack.create_audio_track("audio", audio_source) + + # Publish tracks + video_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_CAMERA) + audio_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) + + await room.local_participant.publish_track(video_track, video_options) + await room.local_participant.publish_track(audio_track, audio_options) + + # Create AV synchronizer + av_sync = AVSynchronizer( + audio_source=audio_source, + video_source=video_source, + video_fps=media_info.video_fps, + video_queue_size_ms=queue_size_ms, + ) + + # Start audio generator + audio_queue = asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]](maxsize=1) + audio_task = asyncio.create_task(audio_generator(media_info, audio_queue)) + + try: + async for video_frame, audio_frame in video_generator( + media_info, audio_queue, av_sync=av_sync + ): + await av_sync.push(video_frame) + if audio_frame: + await av_sync.push(audio_frame) + finally: + audio_task.cancel() + await av_sync.aclose() + + +if __name__ == "__main__": + cli.run_app( + WorkerOptions( + entrypoint_fnc=entrypoint, + job_memory_warn_mb=400, + ) + ) diff --git a/examples/video-stream/video_play.py b/examples/video-stream/video_play.py new file mode 100644 index 000000000..c65743e1a --- /dev/null +++ b/examples/video-stream/video_play.py @@ -0,0 +1,179 @@ +import asyncio +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import AsyncIterable, Union + +import av +import numpy as np +from dotenv import load_dotenv +from livekit import rtc +from livekit.agents import JobContext, WorkerOptions, cli, utils +from livekit.agents.utils.av_sync import AVSynchronizer + +# Load environment variables +load_dotenv() + +logger = logging.getLogger(__name__) + + +@dataclass +class MediaInfo: + video_width: int + video_height: int + video_fps: float + audio_sample_rate: int + audio_channels: int + + +class MediaFileStreamer: + """Streams video and audio frames from a media file in an endless loop.""" + + def __init__(self, media_file: Union[str, Path]) -> None: + self._media_file = str(media_file) + # Create separate containers for each stream + self._video_container = av.open(self._media_file) + self._audio_container = av.open(self._media_file) + self._stopped = False + + # Cache media info + video_stream = self._video_container.streams.video[0] + audio_stream = self._audio_container.streams.audio[0] + self._info = MediaInfo( + video_width=video_stream.width, + video_height=video_stream.height, + video_fps=float(video_stream.average_rate), # type: ignore + audio_sample_rate=audio_stream.sample_rate, + audio_channels=audio_stream.channels, + ) + + @property + def info(self) -> MediaInfo: + return self._info + + async def stream_video(self) -> AsyncIterable[rtc.VideoFrame]: + """Streams video frames from the media file in an endless loop.""" + while not self._stopped: + self._video_container.seek(0) # Seek back to start + for av_frame in self._video_container.decode(video=0): + if self._stopped: + break + # Convert video frame to RGBA + frame = av_frame.to_rgb().to_ndarray() + frame_rgba = np.ones( + (frame.shape[0], frame.shape[1], 4), dtype=np.uint8 + ) + frame_rgba[:, :, :3] = frame + yield rtc.VideoFrame( + width=frame.shape[1], + height=frame.shape[0], + type=rtc.VideoBufferType.RGBA, + data=frame_rgba.tobytes(), + ) + + async def stream_audio(self) -> AsyncIterable[rtc.AudioFrame]: + """Streams audio frames from the media file in an endless loop.""" + while not self._stopped: + self._audio_container.seek(0) # Seek back to start + for av_frame in self._audio_container.decode(audio=0): + if self._stopped: + break + # Convert audio frame to raw int16 samples + frame = av_frame.to_ndarray().T # Transpose to (samples, channels) + frame = (frame * 32768).astype(np.int16) + yield rtc.AudioFrame( + data=frame.tobytes(), + sample_rate=self.info.audio_sample_rate, + num_channels=frame.shape[1], + samples_per_channel=frame.shape[0], + ) + + async def aclose(self) -> None: + """Closes the media container and stops streaming.""" + self._stopped = True + self._video_container.close() + self._audio_container.close() + + +async def entrypoint(job: JobContext): + await job.connect() + room = job.room + + # Create media streamer + # Should we add a sample video file? + media_path = "/path/to/video.mp4" + streamer = MediaFileStreamer(media_path) + media_info = streamer.info + + # Create video and audio sources/tracks + queue_size_ms = 1000 # 1 second + video_source = rtc.VideoSource( + width=media_info.video_width, + height=media_info.video_height, + ) + print(media_info) + audio_source = rtc.AudioSource( + sample_rate=media_info.audio_sample_rate, + num_channels=media_info.audio_channels, + queue_size_ms=queue_size_ms, + ) + + video_track = rtc.LocalVideoTrack.create_video_track("video", video_source) + audio_track = rtc.LocalAudioTrack.create_audio_track("audio", audio_source) + + # Publish tracks + video_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_CAMERA) + audio_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) + + await room.local_participant.publish_track(video_track, video_options) + await room.local_participant.publish_track(audio_track, audio_options) + + @utils.log_exceptions(logger=logger) + async def _push_video_frames( + video_stream: AsyncIterable[rtc.VideoFrame], av_sync: AVSynchronizer + ) -> None: + """Task to push video frames to the AV synchronizer.""" + async for frame in video_stream: + await av_sync.push(frame) + await asyncio.sleep(0) + + @utils.log_exceptions(logger=logger) + async def _push_audio_frames( + audio_stream: AsyncIterable[rtc.AudioFrame], av_sync: AVSynchronizer + ) -> None: + """Task to push audio frames to the AV synchronizer.""" + async for frame in audio_stream: + await av_sync.push(frame) + await asyncio.sleep(0) + + try: + av_sync = AVSynchronizer( + audio_source=audio_source, + video_source=video_source, + video_fps=media_info.video_fps, + video_queue_size_ms=queue_size_ms, + ) + + # Create and run video and audio streaming tasks + video_stream = streamer.stream_video() + audio_stream = streamer.stream_audio() + + video_task = asyncio.create_task(_push_video_frames(video_stream, av_sync)) + audio_task = asyncio.create_task(_push_audio_frames(audio_stream, av_sync)) + + # Wait for both tasks to complete + await asyncio.gather(video_task, audio_task) + await av_sync.wait_for_playout() + + finally: + await streamer.aclose() + await av_sync.aclose() + + +if __name__ == "__main__": + cli.run_app( + WorkerOptions( + entrypoint_fnc=entrypoint, + job_memory_warn_mb=400, + ) + ) diff --git a/livekit-agents/livekit/agents/utils/av_sync.py b/livekit-agents/livekit/agents/utils/av_sync.py new file mode 100644 index 000000000..3491d7019 --- /dev/null +++ b/livekit-agents/livekit/agents/utils/av_sync.py @@ -0,0 +1,172 @@ +import asyncio +import logging +import time +from collections import deque +from typing import Optional, Union + +import livekit.agents.utils as utils +from livekit import rtc + +logger = logging.getLogger(__name__) + + +class AVSynchronizer: + """Synchronize audio and video capture. + + Usage: + av_sync = AVSynchronizer( + audio_source=audio_source, + video_source=video_source, + video_fps=video_fps, + ) + + async for video_frame, audio_frame in video_generator: + await av_sync.push(video_frame) + await av_sync.push(audio_frame) + """ + + def __init__( + self, + *, + audio_source: rtc.AudioSource, + video_source: rtc.VideoSource, + video_fps: float, + video_queue_size_ms: float = 100, + _max_delay_tolerance_ms: float = 300, + ): + self._audio_source = audio_source + self._video_source = video_source + self._video_fps = video_fps + self._video_queue_size_ms = video_queue_size_ms + self._max_delay_tolerance_ms = _max_delay_tolerance_ms + + self._stopped = False + + self._video_queue_max_size = int( + self._video_fps * self._video_queue_size_ms / 1000 + ) + if self._video_queue_size_ms > 0: + # ensure queue is bounded if queue size is specified + self._video_queue_max_size = max(1, self._video_queue_max_size) + + self._video_queue = asyncio.Queue[rtc.VideoFrame]( + maxsize=self._video_queue_max_size + ) + self._fps_controller = _FPSController( + expected_fps=self._video_fps, + max_delay_tolerance_ms=self._max_delay_tolerance_ms, + ) + self._capture_video_task = asyncio.create_task(self._capture_video()) + + async def push(self, frame: Union[rtc.VideoFrame, rtc.AudioFrame]) -> None: + if isinstance(frame, rtc.AudioFrame): + await self._audio_source.capture_frame(frame) + return + + await self._video_queue.put(frame) + + async def clear_queue(self) -> None: + self._audio_source.clear_queue() + while not self._video_queue.empty(): + await self._video_queue.get() + + async def wait_for_playout(self) -> None: + """Wait until all video and audio frames are played out.""" + await self._audio_source.wait_for_playout() + await self._video_queue.join() + + async def _capture_video(self) -> None: + while not self._stopped: + frame = await self._video_queue.get() + async with self._fps_controller: + self._video_source.capture_frame(frame) + self._video_queue.task_done() + + async def aclose(self) -> None: + self._stopped = True + if self._capture_video_task: + await utils.aio.gracefully_cancel(self._capture_video_task) + + @property + def actual_fps(self) -> float: + return self._fps_controller.actual_fps + + +class _FPSController: + def __init__( + self, *, expected_fps: float, max_delay_tolerance_ms: float = 300 + ) -> None: + """Controls frame rate by adjusting sleep time based on actual FPS. + + Usage: + async with _FPSController(expected_fps=30): + # process frame + pass + + Args: + expected_fps: Target frames per second + max_delay_tolerance_ms: Maximum delay tolerance in milliseconds + """ + self._expected_fps = expected_fps + self._frame_interval = 1.0 / expected_fps + self._max_delay_tolerance_secs = max_delay_tolerance_ms / 1000 + + self._next_frame_time: Optional[float] = None + self._fps_calc_winsize = max(2, int(1.0 * expected_fps)) + self._send_timestamps: deque[float] = deque(maxlen=self._fps_calc_winsize) + + async def __aenter__(self) -> None: + await self.wait_next_process() + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + self.after_process() + + async def wait_next_process(self) -> None: + """Wait until it's time for the next frame. + + Adjusts sleep time based on actual FPS to maintain target rate. + """ + current_time = time.perf_counter() + + # initialize the next frame time + if self._next_frame_time is None: + self._next_frame_time = current_time + + # calculate sleep time + sleep_time = self._next_frame_time - current_time + if sleep_time > 0: + await asyncio.sleep(sleep_time) + else: + # check if significantly behind schedule + if -sleep_time > self._max_delay_tolerance_secs: + logger.warning( + f"Frame capture was behind schedule for " + f"{-sleep_time * 1000:.2f} ms" + ) + self._next_frame_time = time.perf_counter() + + def after_process(self) -> None: + """Update timing information after processing a frame.""" + assert ( + self._next_frame_time is not None + ), "wait_next_process must be called first" + + # update timing information + self._send_timestamps.append(time.perf_counter()) + + # calculate next frame time + self._next_frame_time += self._frame_interval + + @property + def expected_fps(self) -> float: + return self._expected_fps + + @property + def actual_fps(self) -> float: + """Get current average FPS.""" + if len(self._send_timestamps) < 2: + return 0 + + return (len(self._send_timestamps) - 1) / ( + self._send_timestamps[-1] - self._send_timestamps[0] + )