From fdbf5b739f12f6add1db0f9525e86f687fe0c8b9 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Wed, 4 Dec 2024 00:04:10 +0800 Subject: [PATCH 01/10] feat: add AVSynchronizer --- .../livekit/agents/utils/av_sync.py | 146 ++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 livekit-agents/livekit/agents/utils/av_sync.py diff --git a/livekit-agents/livekit/agents/utils/av_sync.py b/livekit-agents/livekit/agents/utils/av_sync.py new file mode 100644 index 000000000..f368a6833 --- /dev/null +++ b/livekit-agents/livekit/agents/utils/av_sync.py @@ -0,0 +1,146 @@ +import asyncio +import logging +import time +from collections import deque + +import livekit.agents.utils as utils +from livekit import rtc + +logger = logging.getLogger(__name__) + + +class AVSynchronizer: + """Synchronize audio and video capture. + + Usage: + av_sync = AVSynchronizer( + audio_source=audio_source, + video_source=video_source, + video_sample_rate=video_sample_rate, + ) + av_sync.start() + + async for video_frame, audio_frame in video_generator: + av_sync.push(video_frame) + av_sync.push(audio_frame) + """ + + def __init__( + self, + *, + audio_source: rtc.AudioSource, + video_source: rtc.VideoSource, + video_sample_rate: float, + video_queue_size_ms: float = 1000, + _max_delay_tolerance_ms: float = 300, + ): + self._audio_source = audio_source + self._video_source = video_source + self._video_sample_rate = video_sample_rate + + self._video_queue_size_ms = video_queue_size_ms + self._video_queue_max_size = int(video_sample_rate * video_queue_size_ms / 1000) + self._video_queue = asyncio.Queue[rtc.VideoFrame]( + maxsize=self._video_queue_max_size + ) + + self._fps_controller = _FPSController( + expected_fps=video_sample_rate, + max_delay_tolerance_ms=_max_delay_tolerance_ms, + ) + self._stopped = False + + def start(self) -> None: + self._capture_video_task = asyncio.create_task(self._capture_video_task()) + + async def push(self, frame: rtc.VideoFrame | rtc.AudioFrame) -> None: + if isinstance(frame, rtc.AudioFrame): + await self._audio_source.capture_frame(frame) + return + + await self._video_queue.put(frame) + + async def _capture_video_task(self) -> None: + while not self._stopped: + frame = await self._video_queue.get() + + await self._fps_controller.wait_next_process() + self._video_source.capture_frame(frame) + self._fps_controller.after_process() + + async def aclose(self) -> None: + self._stopped = True + if self._capture_video_task: + await utils.aio.gracefully_cancel(self._capture_video_task) + + +class _FPSController: + def __init__( + self, *, expected_fps: float, max_delay_tolerance_ms: float = 300 + ) -> None: + """Controls frame rate by adjusting sleep time based on actual FPS. + + Usage: + fps_controller = _FPSController(expected_fps=30, max_delay_tolerance_ms=300) + while True: + await fps_controller.wait_next_frame() + # process frame + await fps_controller.after_process() + + Args: + expected_fps: Target frames per second + max_delay_tolerance_ms: Maximum delay tolerance in milliseconds + """ + self._expected_fps = expected_fps + self._frame_interval = 1.0 / expected_fps + + self._max_delay_tolerance_secs = max_delay_tolerance_ms / 1000 + + self._next_frame_time = None + self._send_timestamps = deque(maxlen=self._fps_calc_winsize) + + async def wait_next_process(self) -> None: + """Wait until it's time for the next frame. + + Adjusts sleep time based on actual FPS to maintain target rate. + """ + current_time = time.perf_counter() + + # initialize the next frame time + if self._next_frame_time is None: + self._next_frame_time = current_time + + # calculate sleep time + sleep_time = self._next_frame_time - current_time + if sleep_time > 0: + await asyncio.sleep(sleep_time) + else: + # check if significantly behind schedule + if -sleep_time > self._max_delay_tolerance_secs: + logger.warning( + f"Frame capture was behind schedule for " + f"{-sleep_time * 1000:.2f} ms" + ) + self._next_frame_time = time.perf_counter() + + def after_process(self) -> None: + """Update timing information after processing a frame.""" + # update timing information + self._send_timestamps.append(time.perf_counter()) + + # calculate next frame time + self._next_frame_time += self._frame_interval + + @property + def expected_fps(self) -> float: + return self._expected_fps + + @property + def actual_fps(self) -> float: + """Get current average FPS.""" + if len(self._send_timestamps) < 2: + return 0 + + return (len(self._send_timestamps) - 1) / ( + self._send_timestamps[-1] - self._send_timestamps[0] + ) From 55548b88bfab5aae2444fc126a9a4f8829f7fb0c Mon Sep 17 00:00:00 2001 From: Long Chen Date: Wed, 4 Dec 2024 00:14:51 +0800 Subject: [PATCH 02/10] fix: remove start for av sync --- .../livekit/agents/utils/av_sync.py | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/livekit-agents/livekit/agents/utils/av_sync.py b/livekit-agents/livekit/agents/utils/av_sync.py index f368a6833..ce74f72cf 100644 --- a/livekit-agents/livekit/agents/utils/av_sync.py +++ b/livekit-agents/livekit/agents/utils/av_sync.py @@ -18,7 +18,6 @@ class AVSynchronizer: video_source=video_source, video_sample_rate=video_sample_rate, ) - av_sync.start() async for video_frame, audio_frame in video_generator: av_sync.push(video_frame) @@ -28,12 +27,17 @@ class AVSynchronizer: def __init__( self, *, - audio_source: rtc.AudioSource, - video_source: rtc.VideoSource, - video_sample_rate: float, + audio_source: rtc.AudioSource | None, + video_source: rtc.VideoSource | None, + video_sample_rate: float | None = None, video_queue_size_ms: float = 1000, _max_delay_tolerance_ms: float = 300, ): + if video_source is not None and video_sample_rate is None: + raise ValueError( + "video_sample_rate is required when video_source is provided" + ) + self._audio_source = audio_source self._video_source = video_source self._video_sample_rate = video_sample_rate @@ -43,30 +47,37 @@ def __init__( self._video_queue = asyncio.Queue[rtc.VideoFrame]( maxsize=self._video_queue_max_size ) + self._max_delay_tolerance_ms = _max_delay_tolerance_ms - self._fps_controller = _FPSController( - expected_fps=video_sample_rate, - max_delay_tolerance_ms=_max_delay_tolerance_ms, - ) self._stopped = False - - def start(self) -> None: - self._capture_video_task = asyncio.create_task(self._capture_video_task()) + self._capture_video_task = None + if self._video_source is not None: + self._capture_video_task = asyncio.create_task(self._capture_video_task()) async def push(self, frame: rtc.VideoFrame | rtc.AudioFrame) -> None: if isinstance(frame, rtc.AudioFrame): + if self._audio_source is None: + logger.warning("No audio source provided") + return await self._audio_source.capture_frame(frame) return + if self._video_source is None: + logger.warning("No video source provided") + return await self._video_queue.put(frame) async def _capture_video_task(self) -> None: + fps_controller = _FPSController( + expected_fps=self._video_sample_rate, + max_delay_tolerance_ms=self._max_delay_tolerance_ms, + ) while not self._stopped: frame = await self._video_queue.get() - await self._fps_controller.wait_next_process() + await fps_controller.wait_next_process() self._video_source.capture_frame(frame) - self._fps_controller.after_process() + fps_controller.after_process() async def aclose(self) -> None: self._stopped = True From ff034d945f23d1b77cdfbbaab6e73e1b2ea4e5bc Mon Sep 17 00:00:00 2001 From: Long Chen Date: Wed, 4 Dec 2024 11:55:17 +0800 Subject: [PATCH 03/10] fix: fix types --- .../livekit/agents/utils/av_sync.py | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/livekit-agents/livekit/agents/utils/av_sync.py b/livekit-agents/livekit/agents/utils/av_sync.py index ce74f72cf..117749823 100644 --- a/livekit-agents/livekit/agents/utils/av_sync.py +++ b/livekit-agents/livekit/agents/utils/av_sync.py @@ -2,6 +2,7 @@ import logging import time from collections import deque +from typing import Optional, Union import livekit.agents.utils as utils from livekit import rtc @@ -27,9 +28,9 @@ class AVSynchronizer: def __init__( self, *, - audio_source: rtc.AudioSource | None, - video_source: rtc.VideoSource | None, - video_sample_rate: float | None = None, + audio_source: Optional[rtc.AudioSource], + video_source: Optional[rtc.VideoSource], + video_sample_rate: Optional[float], video_queue_size_ms: float = 1000, _max_delay_tolerance_ms: float = 300, ): @@ -41,20 +42,24 @@ def __init__( self._audio_source = audio_source self._video_source = video_source self._video_sample_rate = video_sample_rate - self._video_queue_size_ms = video_queue_size_ms - self._video_queue_max_size = int(video_sample_rate * video_queue_size_ms / 1000) - self._video_queue = asyncio.Queue[rtc.VideoFrame]( - maxsize=self._video_queue_max_size - ) self._max_delay_tolerance_ms = _max_delay_tolerance_ms self._stopped = False - self._capture_video_task = None - if self._video_source is not None: - self._capture_video_task = asyncio.create_task(self._capture_video_task()) - async def push(self, frame: rtc.VideoFrame | rtc.AudioFrame) -> None: + self._video_queue: Optional[asyncio.Queue[rtc.VideoFrame]] = None + self._capture_video_task: Optional[asyncio.Task[None]] = None + if self._video_source and self._video_sample_rate is not None: + _video_queue_max_size = int( + self._video_sample_rate * self._video_queue_size_ms / 1000 + ) + self._video_queue = asyncio.Queue[rtc.VideoFrame]( + maxsize=_video_queue_max_size + ) + + self._capture_video_task = asyncio.create_task(self._capture_video()) + + async def push(self, frame: Union[rtc.VideoFrame, rtc.AudioFrame]) -> None: if isinstance(frame, rtc.AudioFrame): if self._audio_source is None: logger.warning("No audio source provided") @@ -62,12 +67,18 @@ async def push(self, frame: rtc.VideoFrame | rtc.AudioFrame) -> None: await self._audio_source.capture_frame(frame) return - if self._video_source is None: + if self._video_queue is None: logger.warning("No video source provided") return await self._video_queue.put(frame) - async def _capture_video_task(self) -> None: + async def _capture_video(self) -> None: + assert ( + self._video_source + and self._video_queue + and self._video_sample_rate is not None + ) + fps_controller = _FPSController( expected_fps=self._video_sample_rate, max_delay_tolerance_ms=self._max_delay_tolerance_ms, @@ -107,8 +118,9 @@ def __init__( self._max_delay_tolerance_secs = max_delay_tolerance_ms / 1000 - self._next_frame_time = None - self._send_timestamps = deque(maxlen=self._fps_calc_winsize) + self._next_frame_time: float | None = None + self._fps_calc_winsize = max(2, int(0.5 * expected_fps)) + self._send_timestamps: deque[float] = deque(maxlen=self._fps_calc_winsize) async def wait_next_process(self) -> None: """Wait until it's time for the next frame. @@ -136,6 +148,10 @@ async def wait_next_process(self) -> None: def after_process(self) -> None: """Update timing information after processing a frame.""" + assert ( + self._next_frame_time is not None + ), "wait_next_process must be called first" + # update timing information self._send_timestamps.append(time.perf_counter()) From d4066ead97f649a8173787b3c656f4bed43a522b Mon Sep 17 00:00:00 2001 From: Long Chen Date: Wed, 4 Dec 2024 23:11:15 +0800 Subject: [PATCH 04/10] fix: add enter and exit for _FPSController --- .../livekit/agents/utils/av_sync.py | 73 +++++++------------ 1 file changed, 27 insertions(+), 46 deletions(-) diff --git a/livekit-agents/livekit/agents/utils/av_sync.py b/livekit-agents/livekit/agents/utils/av_sync.py index 117749823..9ba931f95 100644 --- a/livekit-agents/livekit/agents/utils/av_sync.py +++ b/livekit-agents/livekit/agents/utils/av_sync.py @@ -17,78 +17,56 @@ class AVSynchronizer: av_sync = AVSynchronizer( audio_source=audio_source, video_source=video_source, - video_sample_rate=video_sample_rate, + video_fps=video_fps, ) async for video_frame, audio_frame in video_generator: - av_sync.push(video_frame) - av_sync.push(audio_frame) + await av_sync.push(video_frame) + await av_sync.push(audio_frame) """ def __init__( self, *, - audio_source: Optional[rtc.AudioSource], - video_source: Optional[rtc.VideoSource], - video_sample_rate: Optional[float], + audio_source: rtc.AudioSource, + video_source: rtc.VideoSource, + video_fps: float, video_queue_size_ms: float = 1000, _max_delay_tolerance_ms: float = 300, ): - if video_source is not None and video_sample_rate is None: - raise ValueError( - "video_sample_rate is required when video_source is provided" - ) - self._audio_source = audio_source self._video_source = video_source - self._video_sample_rate = video_sample_rate + self._video_fps = video_fps self._video_queue_size_ms = video_queue_size_ms self._max_delay_tolerance_ms = _max_delay_tolerance_ms self._stopped = False - self._video_queue: Optional[asyncio.Queue[rtc.VideoFrame]] = None - self._capture_video_task: Optional[asyncio.Task[None]] = None - if self._video_source and self._video_sample_rate is not None: - _video_queue_max_size = int( - self._video_sample_rate * self._video_queue_size_ms / 1000 - ) - self._video_queue = asyncio.Queue[rtc.VideoFrame]( - maxsize=_video_queue_max_size - ) - - self._capture_video_task = asyncio.create_task(self._capture_video()) + self._video_queue_max_size = int( + self._video_fps * self._video_queue_size_ms / 1000 + ) + self._video_queue = asyncio.Queue[rtc.VideoFrame]( + maxsize=self._video_queue_max_size + ) + self._capture_video_task = asyncio.create_task(self._capture_video()) async def push(self, frame: Union[rtc.VideoFrame, rtc.AudioFrame]) -> None: if isinstance(frame, rtc.AudioFrame): - if self._audio_source is None: - logger.warning("No audio source provided") - return + # TODO: test if frame duration is too long await self._audio_source.capture_frame(frame) return - if self._video_queue is None: - logger.warning("No video source provided") - return await self._video_queue.put(frame) async def _capture_video(self) -> None: - assert ( - self._video_source - and self._video_queue - and self._video_sample_rate is not None - ) - fps_controller = _FPSController( - expected_fps=self._video_sample_rate, + expected_fps=self._video_fps, max_delay_tolerance_ms=self._max_delay_tolerance_ms, ) while not self._stopped: frame = await self._video_queue.get() - - await fps_controller.wait_next_process() - self._video_source.capture_frame(frame) - fps_controller.after_process() + async with fps_controller: + self._video_source.capture_frame(frame) async def aclose(self) -> None: self._stopped = True @@ -103,11 +81,9 @@ def __init__( """Controls frame rate by adjusting sleep time based on actual FPS. Usage: - fps_controller = _FPSController(expected_fps=30, max_delay_tolerance_ms=300) - while True: - await fps_controller.wait_next_frame() + async with _FPSController(expected_fps=30): # process frame - await fps_controller.after_process() + pass Args: expected_fps: Target frames per second @@ -115,13 +91,18 @@ def __init__( """ self._expected_fps = expected_fps self._frame_interval = 1.0 / expected_fps - self._max_delay_tolerance_secs = max_delay_tolerance_ms / 1000 - self._next_frame_time: float | None = None + self._next_frame_time: Optional[float] = None self._fps_calc_winsize = max(2, int(0.5 * expected_fps)) self._send_timestamps: deque[float] = deque(maxlen=self._fps_calc_winsize) + async def __aenter__(self) -> None: + await self.wait_next_process() + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + self.after_process() + async def wait_next_process(self) -> None: """Wait until it's time for the next frame. From 58d2f8717c0db9a47497838063f7645c5c81d9e5 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Thu, 5 Dec 2024 00:10:03 +0800 Subject: [PATCH 05/10] fix: add av sync example --- examples/video-stream/av_sync_agent.py | 172 ++++++++++++++++++ .../livekit/agents/utils/av_sync.py | 4 + 2 files changed, 176 insertions(+) create mode 100644 examples/video-stream/av_sync_agent.py diff --git a/examples/video-stream/av_sync_agent.py b/examples/video-stream/av_sync_agent.py new file mode 100644 index 000000000..751e322d0 --- /dev/null +++ b/examples/video-stream/av_sync_agent.py @@ -0,0 +1,172 @@ +import asyncio +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import AsyncIterable + +import av +import numpy as np +from dotenv import load_dotenv +from livekit import rtc +from livekit.agents import JobContext, WorkerOptions, cli, utils +from livekit.agents.utils.av_sync import AVSynchronizer + +# Load environment variables +load_dotenv() + +logger = logging.getLogger(__name__) + + +@dataclass +class MediaInfo: + video_width: int + video_height: int + video_fps: float + audio_sample_rate: int + audio_channels: int + + +class MediaFileStreamer: + """Streams video and audio frames from a media file.""" + + def __init__(self, media_file: str | Path) -> None: + self._media_file = str(media_file) + self._container = av.open(self._media_file) + + self._video_stream = self._container.streams.video[0] + self._audio_stream = self._container.streams.audio[0] + + # Cache media info + self._info = MediaInfo( + video_width=self._video_stream.width, + video_height=self._video_stream.height, + video_fps=float(self._video_stream.average_rate), + audio_sample_rate=self._audio_stream.sample_rate, + audio_channels=self._audio_stream.channels, + ) + + @property + def info(self) -> MediaInfo: + return self._info + + async def stream_video(self) -> AsyncIterable[rtc.VideoFrame]: + """Streams video frames from the media file.""" + container = av.open(self._media_file) + try: + for frame in container.decode(video=0): + # Convert video frame to RGBA + frame = frame.to_rgb().to_ndarray() + frame_rgba = np.ones( + (frame.shape[0], frame.shape[1], 4), dtype=np.uint8 + ) + frame_rgba[:, :, :3] = frame + yield rtc.VideoFrame( + width=frame.shape[1], + height=frame.shape[0], + type=rtc.VideoBufferType.RGBA, + data=frame_rgba.tobytes(), + ) + finally: + container.close() + + async def stream_audio(self) -> AsyncIterable[rtc.AudioFrame]: + """Streams audio frames from the media file.""" + container = av.open(self._media_file) + try: + for frame in container.decode(audio=0): + # Convert audio frame to raw int16 samples + frame: np.ndarray = frame.to_ndarray(format="s16") + frame = (frame * 32768).astype(np.int16) + yield rtc.AudioFrame( + data=frame.tobytes(), + sample_rate=self.info.audio_sample_rate, + num_channels=frame.shape[0], + samples_per_channel=frame.shape[1], + ) + finally: + container.close() + + async def aclose(self) -> None: + """Closes the media container.""" + self._container.close() + + +async def entrypoint(job: JobContext): + await job.connect() + room = job.room + + # Create media streamer + media_path = "/path/to/sample.mp4" + streamer = MediaFileStreamer(media_path) + media_info = streamer.info + + # Create video and audio sources/tracks + queue_size_ms = 100 + video_source = rtc.VideoSource( + width=media_info.video_width, + height=media_info.video_height, + ) + audio_source = rtc.AudioSource( + sample_rate=media_info.audio_sample_rate, + num_channels=media_info.audio_channels, + queue_size_ms=queue_size_ms, + ) + + video_track = rtc.LocalVideoTrack.create_video_track("video", video_source) + audio_track = rtc.LocalAudioTrack.create_audio_track("audio", audio_source) + + # Publish tracks + video_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_CAMERA) + audio_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) + + await room.local_participant.publish_track(video_track, video_options) + await room.local_participant.publish_track(audio_track, audio_options) + + # Create AV synchronizer + av_sync = AVSynchronizer( + audio_source=audio_source, + video_source=video_source, + video_fps=media_info.video_fps, + video_queue_size_ms=queue_size_ms, + ) + + @utils.log_exceptions(logger=logger) + async def _push_video_frames( + video_stream: AsyncIterable[rtc.VideoFrame], av_sync: AVSynchronizer + ) -> None: + """Task to push video frames to the AV synchronizer.""" + async for frame in video_stream: + await av_sync.push(frame) + + @utils.log_exceptions(logger=logger) + async def _push_audio_frames( + audio_stream: AsyncIterable[rtc.AudioFrame], av_sync: AVSynchronizer + ) -> None: + """Task to push audio frames to the AV synchronizer.""" + async for frame in audio_stream: + await av_sync.push(frame) + + try: + while True: + # Create and run video and audio streaming tasks + video_stream = streamer.stream_video() + audio_stream = streamer.stream_audio() + + video_task = asyncio.create_task(_push_video_frames(video_stream, av_sync)) + audio_task = asyncio.create_task(_push_audio_frames(audio_stream, av_sync)) + + # Wait for both tasks to complete + # TODO: wait the frame in buffer to be processed + await asyncio.gather(video_task, audio_task) + finally: + await av_sync.aclose() + await streamer.aclose() + + +if __name__ == "__main__": + cli.run_app( + WorkerOptions( + entrypoint_fnc=entrypoint, + job_memory_warn_mb=400, + ) + ) diff --git a/livekit-agents/livekit/agents/utils/av_sync.py b/livekit-agents/livekit/agents/utils/av_sync.py index 9ba931f95..4bda2880c 100644 --- a/livekit-agents/livekit/agents/utils/av_sync.py +++ b/livekit-agents/livekit/agents/utils/av_sync.py @@ -119,6 +119,10 @@ async def wait_next_process(self) -> None: if sleep_time > 0: await asyncio.sleep(sleep_time) else: + logger.debug( + "Sync state", + extra={"sleep_time": sleep_time, "fps": self.actual_fps}, + ) # check if significantly behind schedule if -sleep_time > self._max_delay_tolerance_secs: logger.warning( From 4663b07a86b0514d961391ea8fa038650deff940 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Thu, 5 Dec 2024 22:26:55 +0800 Subject: [PATCH 06/10] fix: add audio wave example --- examples/video-stream/audio_wave.py | 251 ++++++++++++++++++ .../{av_sync_agent.py => video_play.py} | 16 +- 2 files changed, 259 insertions(+), 8 deletions(-) create mode 100644 examples/video-stream/audio_wave.py rename examples/video-stream/{av_sync_agent.py => video_play.py} (92%) diff --git a/examples/video-stream/audio_wave.py b/examples/video-stream/audio_wave.py new file mode 100644 index 000000000..f72f9da64 --- /dev/null +++ b/examples/video-stream/audio_wave.py @@ -0,0 +1,251 @@ +import asyncio +import logging +import time +from dataclasses import dataclass +from typing import AsyncIterable, Optional, Union + +import numpy as np +from dotenv import load_dotenv +from livekit import rtc +from livekit.agents import JobContext, WorkerOptions, cli +from livekit.agents.utils.av_sync import AVSynchronizer + +try: + import cv2 +except ImportError: + raise RuntimeError( + "cv2 is required to run this example, " + "install with `pip install opencv-python`" + ) + +# Load environment variables +load_dotenv() + +logger = logging.getLogger(__name__) + + +@dataclass +class MediaInfo: + video_width: int + video_height: int + video_fps: float + audio_sample_rate: int + audio_channels: int + + +class _AudioEndSentinel: + pass + + +async def audio_generator( + media_info: MediaInfo, + output_audio: asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]], +): + """Generates audio frames with alternating sine wave and silence periods""" + frequency = 480 # Hz + amplitude = 0.5 + period = 7.0 + sine_duration = 5.0 # Duration of sine wave in each period + samples_per_frame = 1024 + + while True: + current_time = 0.0 + + # Generate audio for sine_duration seconds + while current_time < sine_duration: + t = np.linspace( + current_time, + current_time + samples_per_frame / media_info.audio_sample_rate, + samples_per_frame, + endpoint=False, + ) + # Create volume envelope using sine wave + volume = np.abs(np.sin(2 * np.pi * current_time / sine_duration)) + samples = amplitude * volume * np.sin(2 * np.pi * frequency * t) + + # Convert to int16 + samples = (samples * 32767).astype(np.int16) + + # Create audio frame + audio_frame = rtc.AudioFrame( + data=samples.tobytes(), + sample_rate=media_info.audio_sample_rate, + num_channels=1, + samples_per_channel=samples_per_frame, + ) + + await output_audio.put(audio_frame) + current_time += samples_per_frame / media_info.audio_sample_rate + await asyncio.sleep(0) + await output_audio.put(_AudioEndSentinel()) + + # Simulate silence + silence_duration = period - sine_duration + await asyncio.sleep(silence_duration) + + +def _draw_timestamp(canvas: np.ndarray, timestamp: float): + height, width = canvas.shape[:2] + text = f"{timestamp:.2f}" + font_face = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 2.0 + thickness = 2 + + (text_width, text_height), baseline = cv2.getTextSize( + text, font_face, font_scale, thickness + ) + x = (width - text_width) // 2 + y = int((height - text_height) * 0.4 + baseline) + cv2.putText(canvas, text, (x, y), font_face, font_scale, (0, 0, 0), thickness) + + +def _draw_volume(canvas: np.ndarray, audio_samples: np.ndarray): + """Draws an audio waveform visualization""" + height, width = canvas.shape[:2] + center_y = height // 2 + 100 + + # Normalize audio samples to [-1, 1] + normalized_samples = audio_samples.astype(np.float32) / 32767.0 + + num_points = min(width, len(normalized_samples[0])) + if len(normalized_samples[0]) > num_points: + indices = np.linspace(0, len(normalized_samples[0]) - 1, num_points, dtype=int) + plot_data = normalized_samples[0][indices] + else: + plot_data = normalized_samples[0] + + x_coords = np.linspace(0, width, num_points, dtype=int) + y_coords = (plot_data * 200) + center_y # Scale the wave amplitude + + # Draw the center line and waveform + cv2.line(canvas, (0, center_y), (width, center_y), (200, 200, 200), 1) + points = np.column_stack((x_coords, y_coords.astype(int))) + for i in range(len(points) - 1): + cv2.line(canvas, tuple(points[i]), tuple(points[i + 1]), (0, 255, 0), 2) + + +async def video_generator( + media_info: MediaInfo, + input_audio: asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]], +) -> AsyncIterable[tuple[rtc.VideoFrame, Optional[rtc.AudioFrame]]]: + canvas = np.zeros( + (media_info.video_height, media_info.video_width, 4), dtype=np.uint8 + ) + canvas.fill(255) + + def _np_to_video_frame(canvas: np.ndarray) -> rtc.VideoFrame: + return rtc.VideoFrame( + width=canvas.shape[1], + height=canvas.shape[0], + type=rtc.VideoBufferType.RGBA, + data=canvas.tobytes(), + ) + + audio_samples_per_frame = int(media_info.audio_sample_rate / media_info.video_fps) + audio_buffer = np.zeros((media_info.audio_channels, 0), dtype=np.int16) + while True: + try: + audio_frame = input_audio.get_nowait() + except asyncio.QueueEmpty: + # silence frame + new_frame = canvas.copy() + _draw_timestamp(new_frame, time.time()) + _draw_volume(new_frame, np.zeros((1, 2))) + video_frame = _np_to_video_frame(new_frame) + yield video_frame, None + + # speed is controlled by the video fps in av_sync + await asyncio.sleep(0) + continue + + if isinstance(audio_frame, _AudioEndSentinel): + # reset the audio buffer + audio_buffer = np.zeros((media_info.audio_channels, 0), dtype=np.int16) + continue + + audio_samples = np.frombuffer(audio_frame.data, dtype=np.int16).reshape( + audio_frame.num_channels, -1 + ) + # append audio samples to the buffer + audio_buffer = np.concatenate([audio_buffer, audio_samples], axis=1) + while audio_buffer.shape[1] >= audio_samples_per_frame: + sub_samples = audio_buffer[:, :audio_samples_per_frame] + audio_buffer = audio_buffer[:, audio_samples_per_frame:] + + new_frame = canvas.copy() + _draw_timestamp(new_frame, time.time()) + _draw_volume(new_frame, sub_samples) + video_frame = _np_to_video_frame(new_frame) + sub_audio_frame = rtc.AudioFrame( + data=sub_samples.tobytes(), + sample_rate=audio_frame.sample_rate, + num_channels=audio_frame.num_channels, + samples_per_channel=sub_samples.shape[1], + ) + yield video_frame, sub_audio_frame + + +async def entrypoint(job: JobContext): + await job.connect() + room = job.room + + # Create media info + media_info = MediaInfo( + video_width=1280, + video_height=720, + video_fps=30.0, + audio_sample_rate=48000, + audio_channels=1, + ) + + # Create video and audio sources/tracks + queue_size_ms = 100 + video_source = rtc.VideoSource( + width=media_info.video_width, + height=media_info.video_height, + ) + audio_source = rtc.AudioSource( + sample_rate=media_info.audio_sample_rate, + num_channels=media_info.audio_channels, + queue_size_ms=queue_size_ms, + ) + + video_track = rtc.LocalVideoTrack.create_video_track("video", video_source) + audio_track = rtc.LocalAudioTrack.create_audio_track("audio", audio_source) + + # Publish tracks + video_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_CAMERA) + audio_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) + + await room.local_participant.publish_track(video_track, video_options) + await room.local_participant.publish_track(audio_track, audio_options) + + # Create AV synchronizer + av_sync = AVSynchronizer( + audio_source=audio_source, + video_source=video_source, + video_fps=media_info.video_fps, + video_queue_size_ms=queue_size_ms, + ) + + # Start audio generator + audio_queue = asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]](maxsize=2) + audio_task = asyncio.create_task(audio_generator(media_info, audio_queue)) + + try: + async for video_frame, audio_frame in video_generator(media_info, audio_queue): + await av_sync.push(video_frame) + if audio_frame: + await av_sync.push(audio_frame) + finally: + audio_task.cancel() + await av_sync.aclose() + + +if __name__ == "__main__": + cli.run_app( + WorkerOptions( + entrypoint_fnc=entrypoint, + job_memory_warn_mb=400, + ) + ) diff --git a/examples/video-stream/av_sync_agent.py b/examples/video-stream/video_play.py similarity index 92% rename from examples/video-stream/av_sync_agent.py rename to examples/video-stream/video_play.py index 751e322d0..dbf709477 100644 --- a/examples/video-stream/av_sync_agent.py +++ b/examples/video-stream/video_play.py @@ -2,7 +2,7 @@ import logging from dataclasses import dataclass from pathlib import Path -from typing import AsyncIterable +from typing import AsyncIterable, Union import av import numpy as np @@ -29,7 +29,7 @@ class MediaInfo: class MediaFileStreamer: """Streams video and audio frames from a media file.""" - def __init__(self, media_file: str | Path) -> None: + def __init__(self, media_file: Union[str, Path]) -> None: self._media_file = str(media_file) self._container = av.open(self._media_file) @@ -40,7 +40,7 @@ def __init__(self, media_file: str | Path) -> None: self._info = MediaInfo( video_width=self._video_stream.width, video_height=self._video_stream.height, - video_fps=float(self._video_stream.average_rate), + video_fps=float(self._video_stream.average_rate), # type: ignore audio_sample_rate=self._audio_stream.sample_rate, audio_channels=self._audio_stream.channels, ) @@ -53,9 +53,9 @@ async def stream_video(self) -> AsyncIterable[rtc.VideoFrame]: """Streams video frames from the media file.""" container = av.open(self._media_file) try: - for frame in container.decode(video=0): + for av_frame in container.decode(video=0): # Convert video frame to RGBA - frame = frame.to_rgb().to_ndarray() + frame = av_frame.to_rgb().to_ndarray() frame_rgba = np.ones( (frame.shape[0], frame.shape[1], 4), dtype=np.uint8 ) @@ -73,9 +73,9 @@ async def stream_audio(self) -> AsyncIterable[rtc.AudioFrame]: """Streams audio frames from the media file.""" container = av.open(self._media_file) try: - for frame in container.decode(audio=0): + for av_frame in container.decode(audio=0): # Convert audio frame to raw int16 samples - frame: np.ndarray = frame.to_ndarray(format="s16") + frame = av_frame.to_ndarray() frame = (frame * 32768).astype(np.int16) yield rtc.AudioFrame( data=frame.tobytes(), @@ -96,7 +96,7 @@ async def entrypoint(job: JobContext): room = job.room # Create media streamer - media_path = "/path/to/sample.mp4" + media_path = "path/to/media/file" streamer = MediaFileStreamer(media_path) media_info = streamer.info From 74ce3596256cdd7de365415078e8159819fb0dda Mon Sep 17 00:00:00 2001 From: Long Chen Date: Fri, 6 Dec 2024 00:56:51 +0800 Subject: [PATCH 07/10] fix: update the example --- examples/video-stream/audio_wave.py | 62 +++++++++++-------- examples/video-stream/video_play.py | 21 +++---- .../livekit/agents/utils/av_sync.py | 31 ++++++---- 3 files changed, 65 insertions(+), 49 deletions(-) diff --git a/examples/video-stream/audio_wave.py b/examples/video-stream/audio_wave.py index f72f9da64..d06f4c9dd 100644 --- a/examples/video-stream/audio_wave.py +++ b/examples/video-stream/audio_wave.py @@ -46,7 +46,7 @@ async def audio_generator( amplitude = 0.5 period = 7.0 sine_duration = 5.0 # Duration of sine wave in each period - samples_per_frame = 1024 + chunk_size = 1024 while True: current_time = 0.0 @@ -55,8 +55,8 @@ async def audio_generator( while current_time < sine_duration: t = np.linspace( current_time, - current_time + samples_per_frame / media_info.audio_sample_rate, - samples_per_frame, + current_time + chunk_size / media_info.audio_sample_rate, + num=chunk_size, endpoint=False, ) # Create volume envelope using sine wave @@ -64,18 +64,19 @@ async def audio_generator( samples = amplitude * volume * np.sin(2 * np.pi * frequency * t) # Convert to int16 - samples = (samples * 32767).astype(np.int16) + samples = (samples[np.newaxis, :] * 32767).astype(np.int16) + if media_info.audio_channels > 1: + samples = np.repeat(samples, media_info.audio_channels, axis=0) # Create audio frame audio_frame = rtc.AudioFrame( data=samples.tobytes(), sample_rate=media_info.audio_sample_rate, - num_channels=1, - samples_per_channel=samples_per_frame, + num_channels=media_info.audio_channels, + samples_per_channel=chunk_size, ) - await output_audio.put(audio_frame) - current_time += samples_per_frame / media_info.audio_sample_rate + current_time += chunk_size / media_info.audio_sample_rate await asyncio.sleep(0) await output_audio.put(_AudioEndSentinel()) @@ -84,9 +85,9 @@ async def audio_generator( await asyncio.sleep(silence_duration) -def _draw_timestamp(canvas: np.ndarray, timestamp: float): +def _draw_timestamp(canvas: np.ndarray, duration: float, fps: float): height, width = canvas.shape[:2] - text = f"{timestamp:.2f}" + text = f"{duration:.1f}s @ {fps:.1f}fps" font_face = cv2.FONT_HERSHEY_SIMPLEX font_scale = 2.0 thickness = 2 @@ -99,7 +100,7 @@ def _draw_timestamp(canvas: np.ndarray, timestamp: float): cv2.putText(canvas, text, (x, y), font_face, font_scale, (0, 0, 0), thickness) -def _draw_volume(canvas: np.ndarray, audio_samples: np.ndarray): +def _draw_wave(canvas: np.ndarray, audio_samples: np.ndarray): """Draws an audio waveform visualization""" height, width = canvas.shape[:2] center_y = height // 2 + 100 @@ -127,30 +128,35 @@ def _draw_volume(canvas: np.ndarray, audio_samples: np.ndarray): async def video_generator( media_info: MediaInfo, input_audio: asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]], + av_sync: AVSynchronizer, # only used for drawing the actual fps on the video ) -> AsyncIterable[tuple[rtc.VideoFrame, Optional[rtc.AudioFrame]]]: canvas = np.zeros( (media_info.video_height, media_info.video_width, 4), dtype=np.uint8 ) canvas.fill(255) - def _np_to_video_frame(canvas: np.ndarray) -> rtc.VideoFrame: + def _np_to_video_frame(image: np.ndarray) -> rtc.VideoFrame: return rtc.VideoFrame( - width=canvas.shape[1], - height=canvas.shape[0], + width=image.shape[1], + height=image.shape[0], type=rtc.VideoBufferType.RGBA, - data=canvas.tobytes(), + data=image.tobytes(), ) audio_samples_per_frame = int(media_info.audio_sample_rate / media_info.video_fps) audio_buffer = np.zeros((media_info.audio_channels, 0), dtype=np.int16) + start_time = time.time() while True: try: - audio_frame = input_audio.get_nowait() - except asyncio.QueueEmpty: - # silence frame + # timeout has to be shorter than the frame interval to avoid starvation + audio_frame = await asyncio.wait_for( + input_audio.get(), timeout=0.5 / media_info.video_fps + ) + except asyncio.TimeoutError: + # generate frame without audio (e.g. silence state) new_frame = canvas.copy() - _draw_timestamp(new_frame, time.time()) - _draw_volume(new_frame, np.zeros((1, 2))) + _draw_timestamp(new_frame, time.time() - start_time, av_sync.actual_fps) + _draw_wave(new_frame, np.zeros((1, 2))) video_frame = _np_to_video_frame(new_frame) yield video_frame, None @@ -159,22 +165,22 @@ def _np_to_video_frame(canvas: np.ndarray) -> rtc.VideoFrame: continue if isinstance(audio_frame, _AudioEndSentinel): - # reset the audio buffer + # drop the audio buffer when the audio finished audio_buffer = np.zeros((media_info.audio_channels, 0), dtype=np.int16) continue audio_samples = np.frombuffer(audio_frame.data, dtype=np.int16).reshape( audio_frame.num_channels, -1 ) - # append audio samples to the buffer + # accumulate audio samples to the buffer audio_buffer = np.concatenate([audio_buffer, audio_samples], axis=1) while audio_buffer.shape[1] >= audio_samples_per_frame: sub_samples = audio_buffer[:, :audio_samples_per_frame] audio_buffer = audio_buffer[:, audio_samples_per_frame:] new_frame = canvas.copy() - _draw_timestamp(new_frame, time.time()) - _draw_volume(new_frame, sub_samples) + _draw_timestamp(new_frame, time.time() - start_time, av_sync.actual_fps) + _draw_wave(new_frame, sub_samples) video_frame = _np_to_video_frame(new_frame) sub_audio_frame = rtc.AudioFrame( data=sub_samples.tobytes(), @@ -199,7 +205,7 @@ async def entrypoint(job: JobContext): ) # Create video and audio sources/tracks - queue_size_ms = 100 + queue_size_ms = 50 video_source = rtc.VideoSource( width=media_info.video_width, height=media_info.video_height, @@ -229,11 +235,13 @@ async def entrypoint(job: JobContext): ) # Start audio generator - audio_queue = asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]](maxsize=2) + audio_queue = asyncio.Queue[Union[rtc.AudioFrame, _AudioEndSentinel]](maxsize=1) audio_task = asyncio.create_task(audio_generator(media_info, audio_queue)) try: - async for video_frame, audio_frame in video_generator(media_info, audio_queue): + async for video_frame, audio_frame in video_generator( + media_info, audio_queue, av_sync=av_sync + ): await av_sync.push(video_frame) if audio_frame: await av_sync.push(audio_frame) diff --git a/examples/video-stream/video_play.py b/examples/video-stream/video_play.py index dbf709477..940aa75a9 100644 --- a/examples/video-stream/video_play.py +++ b/examples/video-stream/video_play.py @@ -96,7 +96,8 @@ async def entrypoint(job: JobContext): room = job.room # Create media streamer - media_path = "path/to/media/file" + # Should we add a sample video file? + media_path = "/path/to/sample/video.mp4" streamer = MediaFileStreamer(media_path) media_info = streamer.info @@ -122,14 +123,6 @@ async def entrypoint(job: JobContext): await room.local_participant.publish_track(video_track, video_options) await room.local_participant.publish_track(audio_track, audio_options) - # Create AV synchronizer - av_sync = AVSynchronizer( - audio_source=audio_source, - video_source=video_source, - video_fps=media_info.video_fps, - video_queue_size_ms=queue_size_ms, - ) - @utils.log_exceptions(logger=logger) async def _push_video_frames( video_stream: AsyncIterable[rtc.VideoFrame], av_sync: AVSynchronizer @@ -148,6 +141,13 @@ async def _push_audio_frames( try: while True: + av_sync = AVSynchronizer( + audio_source=audio_source, + video_source=video_source, + video_fps=media_info.video_fps, + video_queue_size_ms=queue_size_ms, + ) + # Create and run video and audio streaming tasks video_stream = streamer.stream_video() audio_stream = streamer.stream_audio() @@ -156,10 +156,9 @@ async def _push_audio_frames( audio_task = asyncio.create_task(_push_audio_frames(audio_stream, av_sync)) # Wait for both tasks to complete - # TODO: wait the frame in buffer to be processed await asyncio.gather(video_task, audio_task) + await av_sync.aclose() finally: - await av_sync.aclose() await streamer.aclose() diff --git a/livekit-agents/livekit/agents/utils/av_sync.py b/livekit-agents/livekit/agents/utils/av_sync.py index 4bda2880c..6ee8b0d64 100644 --- a/livekit-agents/livekit/agents/utils/av_sync.py +++ b/livekit-agents/livekit/agents/utils/av_sync.py @@ -31,7 +31,7 @@ def __init__( audio_source: rtc.AudioSource, video_source: rtc.VideoSource, video_fps: float, - video_queue_size_ms: float = 1000, + video_queue_size_ms: float = 100, _max_delay_tolerance_ms: float = 300, ): self._audio_source = audio_source @@ -45,9 +45,17 @@ def __init__( self._video_queue_max_size = int( self._video_fps * self._video_queue_size_ms / 1000 ) + if self._video_queue_size_ms > 0: + # ensure queue is bounded if queue size is specified + self._video_queue_max_size = max(1, self._video_queue_max_size) + self._video_queue = asyncio.Queue[rtc.VideoFrame]( maxsize=self._video_queue_max_size ) + self._fps_controller = _FPSController( + expected_fps=self._video_fps, + max_delay_tolerance_ms=self._max_delay_tolerance_ms, + ) self._capture_video_task = asyncio.create_task(self._capture_video()) async def push(self, frame: Union[rtc.VideoFrame, rtc.AudioFrame]) -> None: @@ -58,14 +66,15 @@ async def push(self, frame: Union[rtc.VideoFrame, rtc.AudioFrame]) -> None: await self._video_queue.put(frame) + async def clear_queue(self) -> None: + self._audio_source.clear_queue() + while not self._video_queue.empty(): + await self._video_queue.get() + async def _capture_video(self) -> None: - fps_controller = _FPSController( - expected_fps=self._video_fps, - max_delay_tolerance_ms=self._max_delay_tolerance_ms, - ) while not self._stopped: frame = await self._video_queue.get() - async with fps_controller: + async with self._fps_controller: self._video_source.capture_frame(frame) async def aclose(self) -> None: @@ -73,6 +82,10 @@ async def aclose(self) -> None: if self._capture_video_task: await utils.aio.gracefully_cancel(self._capture_video_task) + @property + def actual_fps(self) -> float: + return self._fps_controller.actual_fps + class _FPSController: def __init__( @@ -94,7 +107,7 @@ def __init__( self._max_delay_tolerance_secs = max_delay_tolerance_ms / 1000 self._next_frame_time: Optional[float] = None - self._fps_calc_winsize = max(2, int(0.5 * expected_fps)) + self._fps_calc_winsize = max(2, int(1.0 * expected_fps)) self._send_timestamps: deque[float] = deque(maxlen=self._fps_calc_winsize) async def __aenter__(self) -> None: @@ -119,10 +132,6 @@ async def wait_next_process(self) -> None: if sleep_time > 0: await asyncio.sleep(sleep_time) else: - logger.debug( - "Sync state", - extra={"sleep_time": sleep_time, "fps": self.actual_fps}, - ) # check if significantly behind schedule if -sleep_time > self._max_delay_tolerance_secs: logger.warning( From b01b5392baab34d745804da46e8deaaa32765305 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Sat, 7 Dec 2024 12:46:20 +0800 Subject: [PATCH 08/10] fix: add wait_for_playout for av sync --- examples/video-stream/README.md | 5 + examples/video-stream/audio_wave.py | 137 +++++++++++------- examples/video-stream/video_play.py | 102 +++++++------ .../livekit/agents/utils/av_sync.py | 7 +- 4 files changed, 149 insertions(+), 102 deletions(-) create mode 100644 examples/video-stream/README.md diff --git a/examples/video-stream/README.md b/examples/video-stream/README.md new file mode 100644 index 000000000..791bab0e9 --- /dev/null +++ b/examples/video-stream/README.md @@ -0,0 +1,5 @@ +# Video and audio synchronization + +This example demonstrates how to synchronize video and audio to a LiveKit room. + + diff --git a/examples/video-stream/audio_wave.py b/examples/video-stream/audio_wave.py index d06f4c9dd..852969181 100644 --- a/examples/video-stream/audio_wave.py +++ b/examples/video-stream/audio_wave.py @@ -1,6 +1,7 @@ import asyncio import logging import time +from collections import deque from dataclasses import dataclass from typing import AsyncIterable, Optional, Union @@ -63,17 +64,17 @@ async def audio_generator( volume = np.abs(np.sin(2 * np.pi * current_time / sine_duration)) samples = amplitude * volume * np.sin(2 * np.pi * frequency * t) - # Convert to int16 - samples = (samples[np.newaxis, :] * 32767).astype(np.int16) + # Convert to int16, (samples, channels) + samples = (samples[:, np.newaxis] * 32767).astype(np.int16) if media_info.audio_channels > 1: - samples = np.repeat(samples, media_info.audio_channels, axis=0) + samples = np.repeat(samples, media_info.audio_channels, axis=1) # Create audio frame audio_frame = rtc.AudioFrame( data=samples.tobytes(), sample_rate=media_info.audio_sample_rate, - num_channels=media_info.audio_channels, - samples_per_channel=chunk_size, + num_channels=samples.shape[1], + samples_per_channel=samples.shape[0], ) await output_audio.put(audio_frame) current_time += chunk_size / media_info.audio_sample_rate @@ -85,44 +86,73 @@ async def audio_generator( await asyncio.sleep(silence_duration) -def _draw_timestamp(canvas: np.ndarray, duration: float, fps: float): - height, width = canvas.shape[:2] - text = f"{duration:.1f}s @ {fps:.1f}fps" - font_face = cv2.FONT_HERSHEY_SIMPLEX - font_scale = 2.0 - thickness = 2 +class WaveformVisualizer: + def __init__(self, history_length: int = 1000): + self.history_length = history_length + self.volume_history: deque[float] = deque(maxlen=history_length) + self.start_time = time.time() - (text_width, text_height), baseline = cv2.getTextSize( - text, font_face, font_scale, thickness - ) - x = (width - text_width) // 2 - y = int((height - text_height) * 0.4 + baseline) - cv2.putText(canvas, text, (x, y), font_face, font_scale, (0, 0, 0), thickness) - - -def _draw_wave(canvas: np.ndarray, audio_samples: np.ndarray): - """Draws an audio waveform visualization""" - height, width = canvas.shape[:2] - center_y = height // 2 + 100 - - # Normalize audio samples to [-1, 1] - normalized_samples = audio_samples.astype(np.float32) / 32767.0 + def draw_timestamp(self, canvas: np.ndarray, fps: float): + height, width = canvas.shape[:2] + text = f"{time.time() - self.start_time:.1f}s @ {fps:.1f}fps" + font_face = cv2.FONT_HERSHEY_SIMPLEX + font_scale = 2.0 + thickness = 2 - num_points = min(width, len(normalized_samples[0])) - if len(normalized_samples[0]) > num_points: - indices = np.linspace(0, len(normalized_samples[0]) - 1, num_points, dtype=int) - plot_data = normalized_samples[0][indices] - else: - plot_data = normalized_samples[0] + (text_width, text_height), baseline = cv2.getTextSize( + text, font_face, font_scale, thickness + ) + x = (width - text_width) // 2 + y = int((height - text_height) * 0.4 + baseline) + cv2.putText(canvas, text, (x, y), font_face, font_scale, (0, 0, 0), thickness) + + def draw_current_wave( + self, canvas: np.ndarray, audio_samples: np.ndarray + ) -> np.ndarray: + """Draw the current waveform and return the current values""" + height, width = canvas.shape[:2] + center_y = height // 2 + 100 + + normalized_samples = audio_samples.astype(np.float32) / 32767.0 + normalized_samples = normalized_samples.mean(axis=1) # (samples,) + num_points = min(width, len(normalized_samples)) + + if len(normalized_samples) > num_points: + indices = np.linspace(0, len(normalized_samples) - 1, num_points, dtype=int) + plot_data = normalized_samples[indices] + else: + plot_data = normalized_samples + + x_coords = np.linspace(0, width, num_points, dtype=int) + y_coords = (plot_data * 200) + center_y + + cv2.line(canvas, (0, center_y), (width, center_y), (200, 200, 200), 1) + points = np.column_stack((x_coords, y_coords.astype(int))) + for i in range(len(points) - 1): + cv2.line(canvas, tuple(points[i]), tuple(points[i + 1]), (0, 255, 0), 2) + + return plot_data + + def draw_volume_history(self, canvas: np.ndarray, current_volume: float): + height, width = canvas.shape[:2] + center_y = height // 2 + + self.volume_history.append(current_volume) + cv2.line( + canvas, (0, center_y - 250), (width, center_y - 250), (200, 200, 200), 1 + ) - x_coords = np.linspace(0, width, num_points, dtype=int) - y_coords = (plot_data * 200) + center_y # Scale the wave amplitude + volume_x = np.linspace(0, width, len(self.volume_history), dtype=int) + volume_y = center_y - 250 + (np.array(self.volume_history) * 200) + points = np.column_stack((volume_x, volume_y.astype(int))) + for i in range(len(points) - 1): + cv2.line(canvas, tuple(points[i]), tuple(points[i + 1]), (255, 0, 0), 2) - # Draw the center line and waveform - cv2.line(canvas, (0, center_y), (width, center_y), (200, 200, 200), 1) - points = np.column_stack((x_coords, y_coords.astype(int))) - for i in range(len(points) - 1): - cv2.line(canvas, tuple(points[i]), tuple(points[i + 1]), (0, 255, 0), 2) + def draw(self, canvas: np.ndarray, audio_samples: np.ndarray, fps: float): + self.draw_timestamp(canvas, fps) + plot_data = self.draw_current_wave(canvas, audio_samples) + current_volume = np.abs(plot_data).mean() + self.draw_volume_history(canvas, current_volume) async def video_generator( @@ -144,8 +174,8 @@ def _np_to_video_frame(image: np.ndarray) -> rtc.VideoFrame: ) audio_samples_per_frame = int(media_info.audio_sample_rate / media_info.video_fps) - audio_buffer = np.zeros((media_info.audio_channels, 0), dtype=np.int16) - start_time = time.time() + audio_buffer = np.zeros((0, media_info.audio_channels), dtype=np.int16) + wave_visualizer = WaveformVisualizer() while True: try: # timeout has to be shorter than the frame interval to avoid starvation @@ -155,8 +185,7 @@ def _np_to_video_frame(image: np.ndarray) -> rtc.VideoFrame: except asyncio.TimeoutError: # generate frame without audio (e.g. silence state) new_frame = canvas.copy() - _draw_timestamp(new_frame, time.time() - start_time, av_sync.actual_fps) - _draw_wave(new_frame, np.zeros((1, 2))) + wave_visualizer.draw(new_frame, np.zeros((1, 2)), av_sync.actual_fps) video_frame = _np_to_video_frame(new_frame) yield video_frame, None @@ -170,23 +199,23 @@ def _np_to_video_frame(image: np.ndarray) -> rtc.VideoFrame: continue audio_samples = np.frombuffer(audio_frame.data, dtype=np.int16).reshape( - audio_frame.num_channels, -1 - ) + -1, audio_frame.num_channels + ) # (samples, channels) # accumulate audio samples to the buffer - audio_buffer = np.concatenate([audio_buffer, audio_samples], axis=1) - while audio_buffer.shape[1] >= audio_samples_per_frame: - sub_samples = audio_buffer[:, :audio_samples_per_frame] - audio_buffer = audio_buffer[:, audio_samples_per_frame:] + audio_buffer = np.concatenate([audio_buffer, audio_samples], axis=0) + + while audio_buffer.shape[0] >= audio_samples_per_frame: + sub_samples = audio_buffer[:audio_samples_per_frame, :] + audio_buffer = audio_buffer[audio_samples_per_frame:, :] new_frame = canvas.copy() - _draw_timestamp(new_frame, time.time() - start_time, av_sync.actual_fps) - _draw_wave(new_frame, sub_samples) + wave_visualizer.draw(new_frame, sub_samples, av_sync.actual_fps) video_frame = _np_to_video_frame(new_frame) sub_audio_frame = rtc.AudioFrame( data=sub_samples.tobytes(), sample_rate=audio_frame.sample_rate, - num_channels=audio_frame.num_channels, - samples_per_channel=sub_samples.shape[1], + num_channels=sub_samples.shape[1], + samples_per_channel=sub_samples.shape[0], ) yield video_frame, sub_audio_frame @@ -201,7 +230,7 @@ async def entrypoint(job: JobContext): video_height=720, video_fps=30.0, audio_sample_rate=48000, - audio_channels=1, + audio_channels=2, ) # Create video and audio sources/tracks diff --git a/examples/video-stream/video_play.py b/examples/video-stream/video_play.py index 940aa75a9..c65743e1a 100644 --- a/examples/video-stream/video_play.py +++ b/examples/video-stream/video_play.py @@ -27,22 +27,24 @@ class MediaInfo: class MediaFileStreamer: - """Streams video and audio frames from a media file.""" + """Streams video and audio frames from a media file in an endless loop.""" def __init__(self, media_file: Union[str, Path]) -> None: self._media_file = str(media_file) - self._container = av.open(self._media_file) - - self._video_stream = self._container.streams.video[0] - self._audio_stream = self._container.streams.audio[0] + # Create separate containers for each stream + self._video_container = av.open(self._media_file) + self._audio_container = av.open(self._media_file) + self._stopped = False # Cache media info + video_stream = self._video_container.streams.video[0] + audio_stream = self._audio_container.streams.audio[0] self._info = MediaInfo( - video_width=self._video_stream.width, - video_height=self._video_stream.height, - video_fps=float(self._video_stream.average_rate), # type: ignore - audio_sample_rate=self._audio_stream.sample_rate, - audio_channels=self._audio_stream.channels, + video_width=video_stream.width, + video_height=video_stream.height, + video_fps=float(video_stream.average_rate), # type: ignore + audio_sample_rate=audio_stream.sample_rate, + audio_channels=audio_stream.channels, ) @property @@ -50,10 +52,12 @@ def info(self) -> MediaInfo: return self._info async def stream_video(self) -> AsyncIterable[rtc.VideoFrame]: - """Streams video frames from the media file.""" - container = av.open(self._media_file) - try: - for av_frame in container.decode(video=0): + """Streams video frames from the media file in an endless loop.""" + while not self._stopped: + self._video_container.seek(0) # Seek back to start + for av_frame in self._video_container.decode(video=0): + if self._stopped: + break # Convert video frame to RGBA frame = av_frame.to_rgb().to_ndarray() frame_rgba = np.ones( @@ -66,29 +70,29 @@ async def stream_video(self) -> AsyncIterable[rtc.VideoFrame]: type=rtc.VideoBufferType.RGBA, data=frame_rgba.tobytes(), ) - finally: - container.close() async def stream_audio(self) -> AsyncIterable[rtc.AudioFrame]: - """Streams audio frames from the media file.""" - container = av.open(self._media_file) - try: - for av_frame in container.decode(audio=0): + """Streams audio frames from the media file in an endless loop.""" + while not self._stopped: + self._audio_container.seek(0) # Seek back to start + for av_frame in self._audio_container.decode(audio=0): + if self._stopped: + break # Convert audio frame to raw int16 samples - frame = av_frame.to_ndarray() + frame = av_frame.to_ndarray().T # Transpose to (samples, channels) frame = (frame * 32768).astype(np.int16) yield rtc.AudioFrame( data=frame.tobytes(), sample_rate=self.info.audio_sample_rate, - num_channels=frame.shape[0], - samples_per_channel=frame.shape[1], + num_channels=frame.shape[1], + samples_per_channel=frame.shape[0], ) - finally: - container.close() async def aclose(self) -> None: - """Closes the media container.""" - self._container.close() + """Closes the media container and stops streaming.""" + self._stopped = True + self._video_container.close() + self._audio_container.close() async def entrypoint(job: JobContext): @@ -97,16 +101,17 @@ async def entrypoint(job: JobContext): # Create media streamer # Should we add a sample video file? - media_path = "/path/to/sample/video.mp4" + media_path = "/path/to/video.mp4" streamer = MediaFileStreamer(media_path) media_info = streamer.info # Create video and audio sources/tracks - queue_size_ms = 100 + queue_size_ms = 1000 # 1 second video_source = rtc.VideoSource( width=media_info.video_width, height=media_info.video_height, ) + print(media_info) audio_source = rtc.AudioSource( sample_rate=media_info.audio_sample_rate, num_channels=media_info.audio_channels, @@ -130,6 +135,7 @@ async def _push_video_frames( """Task to push video frames to the AV synchronizer.""" async for frame in video_stream: await av_sync.push(frame) + await asyncio.sleep(0) @utils.log_exceptions(logger=logger) async def _push_audio_frames( @@ -138,28 +144,30 @@ async def _push_audio_frames( """Task to push audio frames to the AV synchronizer.""" async for frame in audio_stream: await av_sync.push(frame) + await asyncio.sleep(0) try: - while True: - av_sync = AVSynchronizer( - audio_source=audio_source, - video_source=video_source, - video_fps=media_info.video_fps, - video_queue_size_ms=queue_size_ms, - ) - - # Create and run video and audio streaming tasks - video_stream = streamer.stream_video() - audio_stream = streamer.stream_audio() - - video_task = asyncio.create_task(_push_video_frames(video_stream, av_sync)) - audio_task = asyncio.create_task(_push_audio_frames(audio_stream, av_sync)) - - # Wait for both tasks to complete - await asyncio.gather(video_task, audio_task) - await av_sync.aclose() + av_sync = AVSynchronizer( + audio_source=audio_source, + video_source=video_source, + video_fps=media_info.video_fps, + video_queue_size_ms=queue_size_ms, + ) + + # Create and run video and audio streaming tasks + video_stream = streamer.stream_video() + audio_stream = streamer.stream_audio() + + video_task = asyncio.create_task(_push_video_frames(video_stream, av_sync)) + audio_task = asyncio.create_task(_push_audio_frames(audio_stream, av_sync)) + + # Wait for both tasks to complete + await asyncio.gather(video_task, audio_task) + await av_sync.wait_for_playout() + finally: await streamer.aclose() + await av_sync.aclose() if __name__ == "__main__": diff --git a/livekit-agents/livekit/agents/utils/av_sync.py b/livekit-agents/livekit/agents/utils/av_sync.py index 6ee8b0d64..3491d7019 100644 --- a/livekit-agents/livekit/agents/utils/av_sync.py +++ b/livekit-agents/livekit/agents/utils/av_sync.py @@ -60,7 +60,6 @@ def __init__( async def push(self, frame: Union[rtc.VideoFrame, rtc.AudioFrame]) -> None: if isinstance(frame, rtc.AudioFrame): - # TODO: test if frame duration is too long await self._audio_source.capture_frame(frame) return @@ -71,11 +70,17 @@ async def clear_queue(self) -> None: while not self._video_queue.empty(): await self._video_queue.get() + async def wait_for_playout(self) -> None: + """Wait until all video and audio frames are played out.""" + await self._audio_source.wait_for_playout() + await self._video_queue.join() + async def _capture_video(self) -> None: while not self._stopped: frame = await self._video_queue.get() async with self._fps_controller: self._video_source.capture_frame(frame) + self._video_queue.task_done() async def aclose(self) -> None: self._stopped = True From f8cb5ce0ab25880c6d9481434adb48053dd6c10b Mon Sep 17 00:00:00 2001 From: Long Chen Date: Sat, 7 Dec 2024 13:02:36 +0800 Subject: [PATCH 09/10] fix: update readme for av sync --- examples/video-stream/README.md | 39 +++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/examples/video-stream/README.md b/examples/video-stream/README.md index 791bab0e9..44d58ae71 100644 --- a/examples/video-stream/README.md +++ b/examples/video-stream/README.md @@ -1,5 +1,40 @@ -# Video and audio synchronization +# Video and Audio Synchronization Examples -This example demonstrates how to synchronize video and audio to a LiveKit room. +This example demonstrates how to synchronize video and audio streams using the `AVSynchronizer` utility. +## AVSynchronizer Usage +The `AVSynchronizer` helps maintain synchronization between video and audio frames. The key principle is to push the initial synchronized video and audio frames together. After that, subsequent frames will be automatically synchronized according to the configured video FPS and audio sample rate. + +```python +av_sync = AVSynchronizer( + audio_source=audio_source, + video_source=video_source, + video_fps=30.0, + video_queue_size_ms=100 +) + +# Push frames to synchronizer +await av_sync.push(video_frame) +await av_sync.push(audio_frame) +``` + +## Examples + +### 1. Video File Playback (`video_play.py`) +Shows how to stream video and audio from separate sources while maintaining sync: + +- Reads video and audio streams separately from a media file +- Uses separate tasks to push video and audio frames to the synchronizer +- Since the streams are continuous, a larger `queue_size_ms` can be used, though this will increase memory usage + +### 2. Audio Visualization (`audio_wave.py`) +Demonstrates generating video based on audio input: + +- Generates audio frames with alternating sine waves and silence +- Creates video frames visualizing the audio waveform +- Shows how to handle cases with and without audio: + - When audio is present: Push synchronized video and audio frames + - During silence: Push only video frames +- Since video and audio frames are pushed in the same loop, audio frames must be smaller than the audio source queue size to avoid blocking +- Uses a small `queue_size_ms` (e.g. 50ms) to control frame generation speed during silence periods From dcb5190bec24bb3c09a403c54d071d10f24138a4 Mon Sep 17 00:00:00 2001 From: Long Chen Date: Mon, 9 Dec 2024 10:07:33 +0800 Subject: [PATCH 10/10] fix: fix audio wave example --- examples/video-stream/audio_wave.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/video-stream/audio_wave.py b/examples/video-stream/audio_wave.py index 852969181..758d59050 100644 --- a/examples/video-stream/audio_wave.py +++ b/examples/video-stream/audio_wave.py @@ -195,7 +195,7 @@ def _np_to_video_frame(image: np.ndarray) -> rtc.VideoFrame: if isinstance(audio_frame, _AudioEndSentinel): # drop the audio buffer when the audio finished - audio_buffer = np.zeros((media_info.audio_channels, 0), dtype=np.int16) + audio_buffer = np.zeros((0, media_info.audio_channels), dtype=np.int16) continue audio_samples = np.frombuffer(audio_frame.data, dtype=np.int16).reshape(