Skip to content

Commit

Permalink
decrease buffer size and print timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
longcw committed Dec 30, 2024
1 parent 1738a10 commit 1920690
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 24 deletions.
62 changes: 42 additions & 20 deletions examples/video-stream/video_play.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@

try:
import av
import cv2
except ImportError:
raise RuntimeError(
"av is required to run this example, install with `pip install av`"
"av and opencv-python is required to run this example, install with `pip install av opencv-python`"
)

# ensure LIVEKIT_URL, LIVEKIT_API_KEY, and LIVEKIT_API_SECRET are set
Expand Down Expand Up @@ -51,36 +52,56 @@ def __init__(self, media_file: Union[str, Path]) -> None:
audio_sample_rate=audio_stream.sample_rate,
audio_channels=audio_stream.channels,
)
print(self._info)

@property
def info(self) -> MediaInfo:
return self._info

async def stream_video(self) -> AsyncIterable[rtc.VideoFrame]:
async def stream_video(
self, av_sync: rtc.AVSynchronizer
) -> AsyncIterable[tuple[rtc.VideoFrame, float]]:
"""Streams video frames from the media file in an endless loop."""
for av_frame in self._video_container.decode(video=0):
for i, av_frame in enumerate(self._video_container.decode(video=0)):
# Convert video frame to RGBA
frame = av_frame.to_rgb().to_ndarray()
frame_rgba = np.ones((frame.shape[0], frame.shape[1], 4), dtype=np.uint8)
frame_rgba[:, :, :3] = frame
yield rtc.VideoFrame(
width=frame.shape[1],
height=frame.shape[0],
type=rtc.VideoBufferType.RGBA,
data=frame_rgba.tobytes(),

# put fps and timestamps in the frame
frame_rgba = cv2.putText(
frame_rgba, f"{av_sync.actual_fps:.2f}fps", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2
)

if i % 10 == 0:
print(
f"decoded frame {i} ({av_frame.time:.3f}s), {av_sync.actual_fps:.2f}fps, "
f"last video time: {av_sync.last_video_time:.3f}s, last audio time: {av_sync.last_audio_time:.3f}s"
)
yield (
rtc.VideoFrame(
width=frame.shape[1],
height=frame.shape[0],
type=rtc.VideoBufferType.RGBA,
data=frame_rgba.tobytes(),
),
av_frame.time,
)

async def stream_audio(self) -> AsyncIterable[rtc.AudioFrame]:
async def stream_audio(self) -> AsyncIterable[tuple[rtc.AudioFrame, float]]:
"""Streams audio frames from the media file in an endless loop."""
for av_frame in self._audio_container.decode(audio=0):
for i, av_frame in enumerate(self._audio_container.decode(audio=0)):
# Convert audio frame to raw int16 samples
frame = av_frame.to_ndarray().T # Transpose to (samples, channels)
frame = (frame * 32768).astype(np.int16)
yield rtc.AudioFrame(
data=frame.tobytes(),
sample_rate=self.info.audio_sample_rate,
num_channels=frame.shape[1],
samples_per_channel=frame.shape[0],
yield (
rtc.AudioFrame(
data=frame.tobytes(),
sample_rate=self.info.audio_sample_rate,
num_channels=frame.shape[1],
samples_per_channel=frame.shape[0],
),
av_frame.time,
)

def reset(self):
Expand All @@ -102,6 +123,7 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
api.VideoGrants(
room_join=True,
room=room_name,
agent=True,
)
)
.to_jwt()
Expand All @@ -121,7 +143,7 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
media_info = streamer.info

# Create video and audio sources/tracks
queue_size_ms = 1000 # 1 second
queue_size_ms = 50 # TODO: testing with different sizes
video_source = rtc.VideoSource(
width=media_info.video_width,
height=media_info.video_height,
Expand Down Expand Up @@ -157,18 +179,18 @@ async def main(room: rtc.Room, room_name: str, media_path: str):
)

async def _push_frames(
stream: AsyncIterable[rtc.VideoFrame | rtc.AudioFrame],
stream: AsyncIterable[tuple[rtc.VideoFrame | rtc.AudioFrame, float]],
av_sync: rtc.AVSynchronizer,
):
async for frame in stream:
await av_sync.push(frame)
async for frame, timestamp in stream:
await av_sync.push(frame, timestamp)
await asyncio.sleep(0)

try:
while True:
streamer.reset()
video_task = asyncio.create_task(
_push_frames(streamer.stream_video(), av_sync)
_push_frames(streamer.stream_video(av_sync), av_sync)
)
audio_task = asyncio.create_task(
_push_frames(streamer.stream_audio(), av_sync)
Expand Down
24 changes: 20 additions & 4 deletions livekit-rtc/livekit/rtc/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def __init__(
self._max_delay_tolerance_ms = _max_delay_tolerance_ms

self._stopped = False
self._last_video_time: float = 0
self._last_audio_time: float = 0

self._video_queue_max_size = int(
self._video_fps * self._video_queue_size_ms / 1000
Expand All @@ -51,7 +53,7 @@ def __init__(
# ensure queue is bounded if queue size is specified
self._video_queue_max_size = max(1, self._video_queue_max_size)

self._video_queue = asyncio.Queue[VideoFrame](
self._video_queue = asyncio.Queue[tuple[VideoFrame, float]](
maxsize=self._video_queue_max_size
)
self._fps_controller = _FPSController(
Expand All @@ -60,12 +62,16 @@ def __init__(
)
self._capture_video_task = asyncio.create_task(self._capture_video())

async def push(self, frame: Union[VideoFrame, AudioFrame]) -> None:
async def push(
self, frame: Union[VideoFrame, AudioFrame], timestamp: Optional[float] = None
) -> None:
if isinstance(frame, AudioFrame):
await self._audio_source.capture_frame(frame)
if timestamp is not None:
self._last_audio_time = timestamp
return

await self._video_queue.put(frame)
await self._video_queue.put((frame, timestamp))

async def clear_queue(self) -> None:
self._audio_source.clear_queue()
Expand All @@ -79,9 +85,11 @@ async def wait_for_playout(self) -> None:

async def _capture_video(self) -> None:
while not self._stopped:
frame = await self._video_queue.get()
frame, timestamp = await self._video_queue.get()
async with self._fps_controller:
self._video_source.capture_frame(frame)
if timestamp is not None:
self._last_video_time = timestamp
self._video_queue.task_done()

async def aclose(self) -> None:
Expand All @@ -93,6 +101,14 @@ async def aclose(self) -> None:
def actual_fps(self) -> float:
return self._fps_controller.actual_fps

@property
def last_video_time(self) -> float:
return self._last_video_time

@property
def last_audio_time(self) -> float:
return self._last_audio_time


class _FPSController:
def __init__(
Expand Down

0 comments on commit 1920690

Please sign in to comment.