Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix av sync example #338

Merged
merged 7 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 58 additions & 22 deletions examples/video-stream/video_play.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,31 +56,38 @@ def __init__(self, media_file: Union[str, Path]) -> None:
def info(self) -> MediaInfo:
return self._info

async def stream_video(self) -> AsyncIterable[rtc.VideoFrame]:
async def stream_video(self) -> AsyncIterable[tuple[rtc.VideoFrame, float]]:
"""Streams video frames from the media file in an endless loop."""
for av_frame in self._video_container.decode(video=0):
for i, av_frame in enumerate(self._video_container.decode(video=0)):
# Convert video frame to RGBA
frame = av_frame.to_rgb().to_ndarray()
frame_rgba = np.ones((frame.shape[0], frame.shape[1], 4), dtype=np.uint8)
frame_rgba[:, :, :3] = frame
yield rtc.VideoFrame(
width=frame.shape[1],
height=frame.shape[0],
type=rtc.VideoBufferType.RGBA,
data=frame_rgba.tobytes(),
yield (
rtc.VideoFrame(
width=frame.shape[1],
height=frame.shape[0],
type=rtc.VideoBufferType.RGBA,
data=frame_rgba.tobytes(),
),
av_frame.time,
)

async def stream_audio(self) -> AsyncIterable[rtc.AudioFrame]:
async def stream_audio(self) -> AsyncIterable[tuple[rtc.AudioFrame, float]]:
"""Streams audio frames from the media file in an endless loop."""
for av_frame in self._audio_container.decode(audio=0):
# Convert audio frame to raw int16 samples
frame = av_frame.to_ndarray().T # Transpose to (samples, channels)
frame = (frame * 32768).astype(np.int16)
yield rtc.AudioFrame(
data=frame.tobytes(),
sample_rate=self.info.audio_sample_rate,
num_channels=frame.shape[1],
samples_per_channel=frame.shape[0],
duration = len(frame) / self.info.audio_sample_rate
yield (
rtc.AudioFrame(
data=frame.tobytes(),
sample_rate=self.info.audio_sample_rate,
num_channels=frame.shape[1],
samples_per_channel=frame.shape[0],
),
av_frame.time + duration,
)

def reset(self):
Expand All @@ -102,6 +109,7 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
api.VideoGrants(
room_join=True,
room=room_name,
agent=True,
)
)
.to_jwt()
Expand All @@ -121,7 +129,7 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
media_info = streamer.info

# Create video and audio sources/tracks
queue_size_ms = 1000 # 1 second
queue_size_ms = 1000
video_source = rtc.VideoSource(
width=media_info.video_width,
height=media_info.video_height,
Expand Down Expand Up @@ -157,26 +165,54 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
)

async def _push_frames(
stream: AsyncIterable[rtc.VideoFrame | rtc.AudioFrame],
stream: AsyncIterable[tuple[rtc.VideoFrame | rtc.AudioFrame, float]],
av_sync: rtc.AVSynchronizer,
):
async for frame in stream:
await av_sync.push(frame)
async for frame, timestamp in stream:
await av_sync.push(frame, timestamp)
await asyncio.sleep(0)

async def _log_fps(av_sync: rtc.AVSynchronizer):
start_time = asyncio.get_running_loop().time()
while True:
await asyncio.sleep(2)
wall_time = asyncio.get_running_loop().time() - start_time
diff = av_sync.last_video_time - av_sync.last_audio_time
logger.info(
f"fps: {av_sync.actual_fps:.2f}, wall_time: {wall_time:.3f}s, "
f"video_time: {av_sync.last_video_time:.3f}s, "
f"audio_time: {av_sync.last_audio_time:.3f}s, diff: {diff:.3f}s"
)

try:
while True:
streamer.reset()
video_task = asyncio.create_task(
_push_frames(streamer.stream_video(), av_sync)
)
audio_task = asyncio.create_task(
_push_frames(streamer.stream_audio(), av_sync)

video_stream = streamer.stream_video()
audio_stream = streamer.stream_audio()

# read the head frames and push them at the same time
first_video_frame, video_timestamp = await video_stream.__anext__()
first_audio_frame, audio_timestamp = await audio_stream.__anext__()
logger.info(
f"first video duration: {1/media_info.video_fps:.3f}s, "
f"first audio duration: {first_audio_frame.duration:.3f}s"
)
await av_sync.push(first_video_frame, video_timestamp)
await av_sync.push(first_audio_frame, audio_timestamp)

video_task = asyncio.create_task(_push_frames(video_stream, av_sync))
audio_task = asyncio.create_task(_push_frames(audio_stream, av_sync))

log_fps_task = asyncio.create_task(_log_fps(av_sync))

# wait for both tasks to complete
await asyncio.gather(video_task, audio_task)
await av_sync.wait_for_playout()

# clean up
av_sync.reset()
log_fps_task.cancel()
logger.info("playout finished")
finally:
await streamer.aclose()
Expand Down
49 changes: 43 additions & 6 deletions livekit-rtc/livekit/rtc/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .audio_source import AudioSource
from .video_source import VideoSource


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -43,6 +44,9 @@ def __init__(
self._max_delay_tolerance_ms = _max_delay_tolerance_ms

self._stopped = False
# the time of the last video/audio frame captured
self._last_video_time: float = 0
self._last_audio_time: float = 0

self._video_queue_max_size = int(
self._video_fps * self._video_queue_size_ms / 1000
Expand All @@ -51,7 +55,7 @@ def __init__(
# ensure queue is bounded if queue size is specified
self._video_queue_max_size = max(1, self._video_queue_max_size)

self._video_queue = asyncio.Queue[VideoFrame](
self._video_queue = asyncio.Queue[tuple[VideoFrame, Optional[float]]](
maxsize=self._video_queue_max_size
)
self._fps_controller = _FPSController(
Expand All @@ -60,28 +64,47 @@ def __init__(
)
self._capture_video_task = asyncio.create_task(self._capture_video())

async def push(self, frame: Union[VideoFrame, AudioFrame]) -> None:
async def push(
self, frame: Union[VideoFrame, AudioFrame], timestamp: Optional[float] = None
) -> None:
"""Push a frame to the synchronizer

Args:
frame: The video or audio frame to push.
timestamp: (optional) The timestamp of the frame, for logging purposes for now.
For AudioFrame, it should be the end time of the frame.
"""
if isinstance(frame, AudioFrame):
await self._audio_source.capture_frame(frame)
if timestamp is not None:
self._last_audio_time = timestamp
return

await self._video_queue.put(frame)
await self._video_queue.put((frame, timestamp))

async def clear_queue(self) -> None:
self._audio_source.clear_queue()
while not self._video_queue.empty():
await self._video_queue.get()
self._video_queue.task_done()

async def wait_for_playout(self) -> None:
"""Wait until all video and audio frames are played out."""
await self._audio_source.wait_for_playout()
await self._video_queue.join()
await asyncio.gather(
self._audio_source.wait_for_playout(),
self._video_queue.join(),
)

def reset(self) -> None:
self._fps_controller.reset()

async def _capture_video(self) -> None:
while not self._stopped:
frame = await self._video_queue.get()
frame, timestamp = await self._video_queue.get()
async with self._fps_controller:
self._video_source.capture_frame(frame)
if timestamp is not None:
self._last_video_time = timestamp
self._video_queue.task_done()

async def aclose(self) -> None:
Expand All @@ -93,6 +116,16 @@ async def aclose(self) -> None:
def actual_fps(self) -> float:
return self._fps_controller.actual_fps

@property
def last_video_time(self) -> float:
"""The time of the last video frame captured"""
return self._last_video_time

@property
def last_audio_time(self) -> float:
"""The time of the last audio frame played out"""
return self._last_audio_time - self._audio_source.queued_duration


class _FPSController:
def __init__(
Expand Down Expand Up @@ -123,6 +156,10 @@ async def __aenter__(self) -> None:
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
self.after_process()

def reset(self) -> None:
self._next_frame_time = None
self._send_timestamps.clear()

async def wait_next_process(self) -> None:
"""Wait until it's time for the next frame.

Expand Down
Loading