diff --git a/.changeset/gold-eagles-flash.md b/.changeset/gold-eagles-flash.md new file mode 100644 index 000000000..3e96159a1 --- /dev/null +++ b/.changeset/gold-eagles-flash.md @@ -0,0 +1,5 @@ +--- +"livekit-agents": patch +--- + +Reorganized metrics, added create_metrics_logger diff --git a/examples/voice-pipeline-agent/cost_metrics.py b/examples/voice-pipeline-agent/cost_metrics.py new file mode 100644 index 000000000..f39e58fc7 --- /dev/null +++ b/examples/voice-pipeline-agent/cost_metrics.py @@ -0,0 +1,80 @@ +import logging + +from dotenv import load_dotenv +from livekit.agents import ( + AutoSubscribe, + JobContext, + JobProcess, + WorkerOptions, + cli, + llm, + metrics, +) +from livekit.agents.pipeline import VoicePipelineAgent +from livekit.plugins import deepgram, openai, silero + +load_dotenv() +logger = logging.getLogger("metrics-example") + +# This example logs pipeline metrics and computes cost of the session + +OPENAI_LLM_INPUT_PRICE = 2.50 / (10**6) # $2.50 per million tokens +OPENAI_LLM_OUTPUT_PRICE = 10 / (10**6) # $10 per million tokens +OPENAI_TTS_PRICE = 15 / (10**6) # $15 per million characters +DEEPGRAM_STT_PRICE = 0.0043 # $0.0043 per minute + + +def prewarm(proc: JobProcess): + proc.userdata["vad"] = silero.VAD.load() + + +async def entrypoint(ctx: JobContext): + initial_ctx = llm.ChatContext().append( + role="system", + text=( + "You are a voice assistant created by LiveKit. Your interface with users will be voice. " + "You should use short and concise responses, and avoiding usage of unpronouncable punctuation." + ), + ) + + await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) + + participant = await ctx.wait_for_participant() + agent = VoicePipelineAgent( + vad=ctx.proc.userdata["vad"], + stt=deepgram.STT(), + llm=openai.LLM(), + tts=openai.TTS(), + chat_ctx=initial_ctx, + ) + + usage_collector = metrics.UsageCollector() + + @agent.on("metrics_collected") + def _on_metrics_collected(metrics: metrics.AgentMetrics): + metrics.log_metrics(metrics) + usage_collector.add_usage(metrics) + + async def log_session_cost(): + summary = usage_collector.get_summary() + llm_cost = ( + summary.llm_prompt_tokens * OPENAI_LLM_INPUT_PRICE + + summary.llm_completion_tokens * OPENAI_LLM_OUTPUT_PRICE + ) + tts_cost = summary.tts_characters_count * OPENAI_TTS_PRICE + stt_cost = summary.stt_audio_duration * DEEPGRAM_STT_PRICE / 60 + + total_cost = llm_cost + tts_cost + stt_cost + + logger.info( + f"Total cost: ${total_cost:.4f} (LLM: ${llm_cost:.4f}, TTS: ${tts_cost:.4f}, STT: ${stt_cost:.4f})" + ) + + ctx.add_shutdown_callback(log_session_cost) + + agent.start(ctx.room, participant) + await agent.say("Hey, how can I help you today?", allow_interruptions=True) + + +if __name__ == "__main__": + cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm)) diff --git a/examples/voice-pipeline-agent/minimal_assistant.py b/examples/voice-pipeline-agent/minimal_assistant.py index e5ea9b64a..e8edd6161 100644 --- a/examples/voice-pipeline-agent/minimal_assistant.py +++ b/examples/voice-pipeline-agent/minimal_assistant.py @@ -10,6 +10,7 @@ WorkerOptions, cli, llm, + metrics, ) from livekit.agents.pipeline import VoicePipelineAgent from livekit.plugins import deepgram, openai, silero @@ -53,6 +54,19 @@ async def entrypoint(ctx: JobContext): agent.start(ctx.room, participant) + usage_collector = metrics.UsageCollector() + + @agent.on("metrics_collected") + def _on_metrics_collected(metrics: metrics.AgentMetrics): + metrics.log_metrics(metrics) + usage_collector.add_usage(metrics) + + async def log_usage(): + summary = usage_collector.get_summary() + logger.info(f"Usage: ${summary}") + + ctx.add_shutdown_callback(log_usage) + # listen to incoming chat messages, only required if you'd like the agent to # answer incoming messages from Chat chat = rtc.ChatManager(ctx.room) diff --git a/examples/voice-pipeline-agent/pipeline_metrics.py b/examples/voice-pipeline-agent/pipeline_metrics.py deleted file mode 100644 index 6fe0c7cd9..000000000 --- a/examples/voice-pipeline-agent/pipeline_metrics.py +++ /dev/null @@ -1,122 +0,0 @@ -import logging - -from dotenv import load_dotenv -from livekit.agents import ( - AutoSubscribe, - JobContext, - JobProcess, - WorkerOptions, - cli, - llm, -) -from livekit.agents.pipeline import PipelineMetrics, VoicePipelineAgent -from livekit.plugins import deepgram, openai, silero - -load_dotenv() -logger = logging.getLogger("metrics-example") - - -OPENAI_LLM_INPUT_PRICE = 2.50 / (10**6) # $2.50 per million tokens -OPENAI_LLM_OUTPUT_PRICE = 10 / (10**6) # $10 per million tokens -OPENAI_TTS_PRICE = 15 / (10**6) # $15 per million characters -DEEPGRAM_STT_PRICE = 0.0043 # $0.0043 per minute - - -def prewarm(proc: JobProcess): - proc.userdata["vad"] = silero.VAD.load() - - -async def entrypoint(ctx: JobContext): - initial_ctx = llm.ChatContext().append( - role="system", - text=( - "You are a voice assistant created by LiveKit. Your interface with users will be voice. " - "You should use short and concise responses, and avoiding usage of unpronouncable punctuation." - ), - ) - - await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) - - participant = await ctx.wait_for_participant() - agent = VoicePipelineAgent( - vad=ctx.proc.userdata["vad"], - stt=deepgram.STT(), - llm=openai.LLM(), - tts=openai.TTS(), - chat_ctx=initial_ctx, - ) - - total_llm_prompt_tokens = 0 - total_llm_completion_tokens = 0 - total_tts_characters_count = 0 - total_stt_audio_duration = 0 - - @agent.on("metrics_collected") - def on_metrics_collected(metrics: PipelineMetrics): - nonlocal \ - total_llm_prompt_tokens, \ - total_llm_completion_tokens, \ - total_tts_characters_count, \ - total_stt_audio_duration - - if metrics["type"] == "vad_metrics": - return # don't log VAD metrics because it is noisy - - if metrics["type"] == "llm_metrics": - total_llm_prompt_tokens += metrics["prompt_tokens"] - total_llm_completion_tokens += metrics["completion_tokens"] - - sequence_id = metrics["sequence_id"] - ttft = metrics["ttft"] - tokens_per_second = metrics["tokens_per_second"] - - logger.info( - f"LLM metrics: sequence_id={sequence_id}, ttft={ttft:.2f}, tokens_per_second={tokens_per_second:.2f}" - ) - - elif metrics["type"] == "tts_metrics": - total_tts_characters_count += metrics["characters_count"] - - sequence_id = metrics["sequence_id"] - ttfb = metrics["ttfb"] - audio_duration = metrics["audio_duration"] - - logger.info( - f"TTS metrics: sequence_id={sequence_id}, ttfb={ttfb}, audio_duration={audio_duration:.2f}" - ) - - elif metrics["type"] == "eou_metrics": - sequence_id = metrics["sequence_id"] - end_of_utterance_delay = metrics["end_of_utterance_delay"] - transcription_delay = metrics["transcription_delay"] - - logger.info( - f"EOU metrics: sequence_id={sequence_id}, end_of_utterance_delay={end_of_utterance_delay:.2f}, transcription_delay={transcription_delay:.2f}" - ) - - elif metrics["type"] == "stt_metrics": - total_stt_audio_duration += metrics["audio_duration"] - logger.info(f"STT metrics: audio_duration={metrics['audio_duration']:.2f}") - - async def log_session_cost(): - llm_cost = ( - total_llm_prompt_tokens * OPENAI_LLM_INPUT_PRICE - + total_llm_completion_tokens * OPENAI_LLM_OUTPUT_PRICE - ) - tts_cost = total_tts_characters_count * OPENAI_TTS_PRICE - stt_cost = total_stt_audio_duration * DEEPGRAM_STT_PRICE / 60 - - total_cost = llm_cost + tts_cost + stt_cost - - logger.info( - f"Total cost: ${total_cost:.4f} (LLM: ${llm_cost:.4f}, TTS: ${tts_cost:.4f}, STT: ${stt_cost:.4f})" - ) - - ctx.add_shutdown_callback(log_session_cost) - - agent.start(ctx.room, participant) - await agent.say("Hey, how can I help you today?", allow_interruptions=True) - - -if __name__ == "__main__": - cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm)) diff --git a/livekit-agents/livekit/agents/__init__.py b/livekit-agents/livekit/agents/__init__.py index 8ef431097..14d40766b 100644 --- a/livekit-agents/livekit/agents/__init__.py +++ b/livekit-agents/livekit/agents/__init__.py @@ -16,6 +16,7 @@ cli, ipc, llm, + metrics, multimodal, pipeline, stt, @@ -60,6 +61,7 @@ "tts", "tokenize", "llm", + "metrics", "transcription", "pipeline", "multimodal", diff --git a/livekit-agents/livekit/agents/llm/__init__.py b/livekit-agents/livekit/agents/llm/__init__.py index ab3149739..e77564048 100644 --- a/livekit-agents/livekit/agents/llm/__init__.py +++ b/livekit-agents/livekit/agents/llm/__init__.py @@ -16,13 +16,11 @@ Choice, ChoiceDelta, CompletionUsage, - LLMMetrics, LLMStream, ) __all__ = [ "LLM", - "LLMMetrics", "LLMStream", "ChatContext", "ChatRole", diff --git a/livekit-agents/livekit/agents/llm/llm.py b/livekit-agents/livekit/agents/llm/llm.py index d18bae173..4c89fccdc 100644 --- a/livekit-agents/livekit/agents/llm/llm.py +++ b/livekit-agents/livekit/agents/llm/llm.py @@ -4,29 +4,17 @@ import time from abc import ABC, abstractmethod from dataclasses import dataclass, field -from typing import Any, AsyncIterable, AsyncIterator, Literal, TypedDict +from typing import Any, AsyncIterable, AsyncIterator, Literal from livekit import rtc from .. import utils +from ..metrics import LLMMetrics from ..utils import aio from . import function_context from .chat_context import ChatContext, ChatRole -class LLMMetrics(TypedDict): - request_id: str - timestamp: float - ttft: float - duration: float - label: str - cancelled: bool - completion_tokens: int - prompt_tokens: int - total_tokens: int - tokens_per_second: float - - @dataclass class ChoiceDelta: role: ChatRole @@ -115,18 +103,19 @@ async def _metrics_monitor_task( usage = ev.usage duration = time.perf_counter() - start_time - metrics: LLMMetrics = { - "timestamp": time.time(), - "request_id": request_id, - "ttft": ttft, - "duration": duration, - "cancelled": self._task.cancelled(), - "label": self._llm._label, - "completion_tokens": usage.completion_tokens if usage else 0, - "prompt_tokens": usage.prompt_tokens if usage else 0, - "total_tokens": usage.total_tokens if usage else 0, - "tokens_per_second": usage.completion_tokens / duration if usage else 0.0, - } + metrics = LLMMetrics( + timestamp=time.time(), + request_id=request_id, + ttft=ttft, + duration=duration, + cancelled=self._task.cancelled(), + label=self._llm._label, + completion_tokens=usage.completion_tokens if usage else 0, + prompt_tokens=usage.prompt_tokens if usage else 0, + total_tokens=usage.total_tokens if usage else 0, + tokens_per_second=usage.completion_tokens / duration if usage else 0.0, + error=None, + ) self._llm.emit("metrics_collected", metrics) @property diff --git a/livekit-agents/livekit/agents/metrics/__init__.py b/livekit-agents/livekit/agents/metrics/__init__.py new file mode 100644 index 000000000..fc5a00d65 --- /dev/null +++ b/livekit-agents/livekit/agents/metrics/__init__.py @@ -0,0 +1,30 @@ +from .base import ( + AgentMetrics, + LLMMetrics, + PipelineEOUMetrics, + PipelineLLMMetrics, + PipelineSTTMetrics, + PipelineTTSMetrics, + PipelineVADMetrics, + STTMetrics, + TTSMetrics, + VADMetrics, +) +from .usage_collector import UsageCollector, UsageSummary +from .utils import log_metrics + +__all__ = [ + "LLMMetrics", + "AgentMetrics", + "PipelineEOUMetrics", + "PipelineSTTMetrics", + "PipelineTTSMetrics", + "PipelineVADMetrics", + "PipelineLLMMetrics", + "VADMetrics", + "STTMetrics", + "TTSMetrics", + "UsageSummary", + "UsageCollector", + "log_metrics", +] diff --git a/livekit-agents/livekit/agents/metrics/base.py b/livekit-agents/livekit/agents/metrics/base.py new file mode 100644 index 000000000..c8154815a --- /dev/null +++ b/livekit-agents/livekit/agents/metrics/base.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Union + + +@dataclass +class Error: + pass + + +@dataclass +class LLMMetrics: + request_id: str + timestamp: float + ttft: float + duration: float + label: str + cancelled: bool + completion_tokens: int + prompt_tokens: int + total_tokens: int + tokens_per_second: float + error: Error | None + + +@dataclass +class STTMetrics: + request_id: str + timestamp: float + duration: float + label: str + audio_duration: float + streamed: bool + error: Error | None + + +@dataclass +class TTSMetrics: + request_id: str + timestamp: float + ttfb: float + duration: float + audio_duration: float + cancelled: bool + characters_count: int + label: str + streamed: bool + error: Error | None + + +@dataclass +class VADMetrics: + timestamp: float + inference_duration_total: float + inference_count: int + label: str + + +@dataclass +class PipelineSTTMetrics(STTMetrics): + pass + + +@dataclass +class PipelineEOUMetrics: + sequence_id: str + """Unique identifier shared across different metrics to combine related STT, LLM, and TTS metrics.""" + + timestamp: float + """Timestamp of when the event was recorded.""" + + end_of_utterance_delay: float + """Amount of time between the end of speech from VAD and the decision to end the user's turn.""" + + transcription_delay: float + """Time taken to obtain the transcript after the end of the user's speech. + + May be 0 if the transcript was already available. + """ + + +@dataclass +class PipelineLLMMetrics(LLMMetrics): + sequence_id: str + """Unique identifier shared across different metrics to combine related STT, LLM, and TTS metrics.""" + + +@dataclass +class PipelineTTSMetrics(TTSMetrics): + sequence_id: str + """Unique identifier shared across different metrics to combine related STT, LLM, and TTS metrics.""" + + +@dataclass +class PipelineVADMetrics(VADMetrics): + pass + + +AgentMetrics = Union[ + STTMetrics, + LLMMetrics, + TTSMetrics, + VADMetrics, + PipelineSTTMetrics, + PipelineEOUMetrics, + PipelineLLMMetrics, + PipelineTTSMetrics, + PipelineVADMetrics, +] diff --git a/livekit-agents/livekit/agents/metrics/usage_collector.py b/livekit-agents/livekit/agents/metrics/usage_collector.py new file mode 100644 index 000000000..b32ca4e23 --- /dev/null +++ b/livekit-agents/livekit/agents/metrics/usage_collector.py @@ -0,0 +1,34 @@ +from copy import deepcopy +from dataclasses import dataclass + +from .base import AgentMetrics, LLMMetrics, STTMetrics, TTSMetrics + + +@dataclass +class UsageSummary: + llm_prompt_tokens: int + llm_completion_tokens: int + tts_characters_count: int + stt_audio_duration: float + + +class UsageCollector: + def __init__(self) -> None: + self._summary = UsageSummary(0, 0, 0, 0.0) + + def __call__(self, metrics: AgentMetrics) -> None: + self.add_usage(metrics) + + def add_usage(self, metrics: AgentMetrics) -> None: + if isinstance(metrics, LLMMetrics): + self._summary.llm_prompt_tokens += metrics.prompt_tokens + self._summary.llm_completion_tokens += metrics.completion_tokens + + elif isinstance(metrics, TTSMetrics): + self._summary.tts_characters_count += metrics.characters_count + + elif isinstance(metrics, STTMetrics): + self._summary.stt_audio_duration += metrics.audio_duration + + def get_summary(self) -> UsageSummary: + return deepcopy(self._summary) diff --git a/livekit-agents/livekit/agents/metrics/utils.py b/livekit-agents/livekit/agents/metrics/utils.py new file mode 100644 index 000000000..45cc85f97 --- /dev/null +++ b/livekit-agents/livekit/agents/metrics/utils.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import logging + +from ..log import logger as default_logger +from .base import ( + AgentMetrics, + LLMMetrics, + PipelineEOUMetrics, + PipelineLLMMetrics, + PipelineSTTMetrics, + PipelineTTSMetrics, + STTMetrics, + TTSMetrics, +) + + +def log_metrics(metrics: AgentMetrics, *, logger: logging.Logger | None = None): + if logger is None: + logger = default_logger + + if isinstance(metrics, PipelineLLMMetrics): + logger.info( + f"Pipeline LLM metrics: sequence_id={metrics.sequence_id}, ttft={metrics.ttft:.2f}, input_tokens={metrics.prompt_tokens}, output_tokens={metrics.completion_tokens}, tokens_per_second={metrics.tokens_per_second:.2f}" + ) + elif isinstance(metrics, LLMMetrics): + logger.info( + f"LLM metrics: ttft={metrics.ttft:.2f}, input_tokens={metrics.prompt_tokens}, output_tokens={metrics.completion_tokens}, tokens_per_second={metrics.tokens_per_second:.2f}" + ) + elif isinstance(metrics, PipelineTTSMetrics): + logger.info( + f"Pipeline TTS metrics: sequence_id={metrics.sequence_id}, ttfb={metrics.ttfb}, audio_duration={metrics.audio_duration:.2f}" + ) + elif isinstance(metrics, TTSMetrics): + logger.info( + f"TTS metrics: ttfb={metrics.ttfb}, audio_duration={metrics.audio_duration:.2f}" + ) + elif isinstance(metrics, PipelineEOUMetrics): + logger.info( + f"Pipeline EOU metrics: sequence_id={metrics.sequence_id}, end_of_utterance_delay={metrics.end_of_utterance_delay:.2f}, transcription_delay={metrics.transcription_delay:.2f}" + ) + elif isinstance(metrics, PipelineSTTMetrics): + logger.info( + f"Pipeline STT metrics: duration={metrics.duration:.2f}, audio_duration={metrics.audio_duration:.2f}" + ) + elif isinstance(metrics, STTMetrics): + logger.info(f"STT metrics: audio_duration={metrics.audio_duration:.2f}") diff --git a/livekit-agents/livekit/agents/pipeline/__init__.py b/livekit-agents/livekit/agents/pipeline/__init__.py index ac012407d..480dd7990 100644 --- a/livekit-agents/livekit/agents/pipeline/__init__.py +++ b/livekit-agents/livekit/agents/pipeline/__init__.py @@ -1,11 +1,3 @@ -from .metrics import ( - PipelineEOUMetrics, - PipelineLLMMetrics, - PipelineMetrics, - PipelineSTTMetrics, - PipelineTTSMetrics, - PipelineVADMetrics, -) from .pipeline_agent import ( AgentCallContext, AgentTranscriptionOptions, @@ -16,10 +8,4 @@ "VoicePipelineAgent", "AgentCallContext", "AgentTranscriptionOptions", - "PipelineMetrics", - "PipelineSTTMetrics", - "PipelineEOUMetrics", - "PipelineLLMMetrics", - "PipelineTTSMetrics", - "PipelineVADMetrics", ] diff --git a/livekit-agents/livekit/agents/pipeline/metrics.py b/livekit-agents/livekit/agents/pipeline/metrics.py deleted file mode 100644 index 72175085d..000000000 --- a/livekit-agents/livekit/agents/pipeline/metrics.py +++ /dev/null @@ -1,65 +0,0 @@ -from __future__ import annotations - -import contextvars -from dataclasses import dataclass -from typing import Literal, TypedDict, Union - -from ..llm import LLMMetrics -from ..stt import STTMetrics -from ..tts import TTSMetrics -from ..vad import VADMetrics - - -@dataclass -class SpeechData: - sequence_id: str - - -SpeechDataContextVar = contextvars.ContextVar[SpeechData]("voice_assistant_speech_data") - - -class PipelineSTTMetrics(STTMetrics, TypedDict): - type: Literal["stt_metrics"] - - -class PipelineEOUMetrics(TypedDict): - type: Literal["eou_metrics"] - sequence_id: str - """Unique identifier shared across different metrics to combine related STT, LLM, and TTS metrics.""" - - timestamp: float - """Timestamp of when the event was recorded.""" - - end_of_utterance_delay: float - """Amount of time between the end of speech from VAD and the decision to end the user's turn.""" - - transcription_delay: float - """Time taken to obtain the transcript after the end of the user's speech. - - May be 0 if the transcript was already available. - """ - - -class PipelineLLMMetrics(LLMMetrics, TypedDict): - type: Literal["llm_metrics"] - sequence_id: str - """Unique identifier shared across different metrics to combine related STT, LLM, and TTS metrics.""" - - -class PipelineTTSMetrics(TTSMetrics, TypedDict): - type: Literal["tts_metrics"] - sequence_id: str - """Unique identifier shared across different metrics to combine related STT, LLM, and TTS metrics.""" - - -class PipelineVADMetrics(VADMetrics, TypedDict): - type: Literal["vad_metrics"] - - -PipelineMetrics = Union[ - PipelineSTTMetrics, - PipelineEOUMetrics, - PipelineLLMMetrics, - PipelineTTSMetrics, - PipelineVADMetrics, -] diff --git a/livekit-agents/livekit/agents/pipeline/pipeline_agent.py b/livekit-agents/livekit/agents/pipeline/pipeline_agent.py index 03efcf71c..69376c426 100644 --- a/livekit-agents/livekit/agents/pipeline/pipeline_agent.py +++ b/livekit-agents/livekit/agents/pipeline/pipeline_agent.py @@ -17,11 +17,10 @@ from livekit import rtc -from .. import llm, stt, tokenize, tts, utils, vad +from .. import metrics, stt, tokenize, tts, utils, vad from .._constants import ATTRIBUTE_AGENT_STATE from .._types import AgentState from ..llm import LLM, ChatContext, ChatMessage, FunctionContext, LLMStream -from . import metrics from .agent_output import AgentOutput, SpeechSource, SynthesisHandle from .agent_playout import AgentPlayout from .human_input import HumanInput @@ -98,6 +97,14 @@ def _default_before_llm_cb( ) +@dataclass +class SpeechData: + sequence_id: str + + +SpeechDataContextVar = contextvars.ContextVar[SpeechData]("voice_assistant_speech_data") + + def _default_before_tts_cb( agent: VoicePipelineAgent, text: str | AsyncIterable[str] ) -> str | AsyncIterable[str]: @@ -317,46 +324,46 @@ def start( raise RuntimeError("voice assistant already started") @self._stt.on("metrics_collected") - def _on_stt_metrics(stt_metrics: stt.STTMetrics) -> None: - pipeline_metrics: metrics.PipelineMetrics = { - "type": "stt_metrics", - **stt_metrics, - } - self.emit("metrics_collected", pipeline_metrics) + def _on_stt_metrics(stt_metrics: metrics.STTMetrics) -> None: + self.emit( + "metrics_collected", + metrics.PipelineSTTMetrics( + **stt_metrics.__dict__, + ), + ) @self._tts.on("metrics_collected") - def _on_tts_metrics(tts_metrics: tts.TTSMetrics) -> None: - speech_data = metrics.SpeechDataContextVar.get(None) + def _on_tts_metrics(tts_metrics: metrics.TTSMetrics) -> None: + speech_data = SpeechDataContextVar.get(None) if speech_data is None: return - pipeline_metrics: metrics.PipelineMetrics = { - "type": "tts_metrics", - "sequence_id": speech_data.sequence_id, - **tts_metrics, - } - self.emit("metrics_collected", pipeline_metrics) + self.emit( + "metrics_collected", + metrics.PipelineTTSMetrics( + **tts_metrics.__dict__, + sequence_id=speech_data.sequence_id, + ), + ) @self._llm.on("metrics_collected") - def _on_llm_metrics(llm_metrics: llm.LLMMetrics) -> None: - speech_data = metrics.SpeechDataContextVar.get(None) + def _on_llm_metrics(llm_metrics: metrics.LLMMetrics) -> None: + speech_data = SpeechDataContextVar.get(None) if speech_data is None: return - - pipeline_metrics: metrics.PipelineMetrics = { - "type": "llm_metrics", - "sequence_id": speech_data.sequence_id, - **llm_metrics, - } - self.emit("metrics_collected", pipeline_metrics) + self.emit( + "metrics_collected", + metrics.PipelineLLMMetrics( + **llm_metrics.__dict__, + sequence_id=speech_data.sequence_id, + ), + ) @self._vad.on("metrics_collected") def _on_vad_metrics(vad_metrics: vad.VADMetrics) -> None: - pipeline_metrics: metrics.PipelineMetrics = { - "type": "vad_metrics", - **vad_metrics, - } - self.emit("metrics_collected", pipeline_metrics) + self.emit( + "metrics_collected", metrics.PipelineVADMetrics(**vad_metrics.__dict__) + ) room.on("participant_connected", self._on_participant_connected) self._room, self._participant = room, participant @@ -626,7 +633,7 @@ async def _synthesize_answer_task( ChatMessage.create(text=handle.user_question, role="user") ) - tk = metrics.SpeechDataContextVar.set(metrics.SpeechData(sequence_id=handle.id)) + tk = SpeechDataContextVar.set(SpeechData(sequence_id=handle.id)) try: llm_stream = self._opts.before_llm_cb(self, copied_ctx) if asyncio.iscoroutine(llm_stream): @@ -646,7 +653,7 @@ async def _synthesize_answer_task( synthesis_handle = self._synthesize_agent_speech(handle.id, llm_stream) handle.initialize(source=llm_stream, synthesis_handle=synthesis_handle) finally: - metrics.SpeechDataContextVar.reset(tk) + SpeechDataContextVar.reset(tk) async def _play_speech(self, speech_handle: SpeechHandle) -> None: try: @@ -848,7 +855,7 @@ def _synthesize_agent_speech( self._agent_output is not None ), "agent output should be initialized when ready" - tk = metrics.SpeechDataContextVar.set(metrics.SpeechData(speech_id)) + tk = SpeechDataContextVar.set(SpeechData(speech_id)) async def _llm_stream_to_str_generator( stream: LLMStream, @@ -890,7 +897,7 @@ async def _llm_stream_to_str_generator( hyphenate_word=self._opts.transcription.hyphenate_word, ) finally: - metrics.SpeechDataContextVar.reset(tk) + SpeechDataContextVar.reset(tk) def _validate_reply_if_possible(self) -> None: """Check if the new agent speech should be played""" @@ -933,13 +940,12 @@ def _validate_reply_if_possible(self) -> None: (self._last_final_transcript_time or 0) - self._last_speech_time, 0 ) - eou_metrics: metrics.PipelineEOUMetrics = { - "type": "eou_metrics", - "timestamp": time.time(), - "sequence_id": self._pending_agent_reply.id, - "end_of_utterance_delay": time_since_last_speech, - "transcription_delay": transcription_delay, - } + eou_metrics = metrics.PipelineEOUMetrics( + timestamp=time.time(), + sequence_id=self._pending_agent_reply.id, + end_of_utterance_delay=time_since_last_speech, + transcription_delay=transcription_delay, + ) self.emit("metrics_collected", eou_metrics) self._add_speech_for_playout(self._pending_agent_reply) diff --git a/livekit-agents/livekit/agents/stt/__init__.py b/livekit-agents/livekit/agents/stt/__init__.py index ba3da8b62..3b1fb146c 100644 --- a/livekit-agents/livekit/agents/stt/__init__.py +++ b/livekit-agents/livekit/agents/stt/__init__.py @@ -7,7 +7,6 @@ SpeechEventType, SpeechStream, STTCapabilities, - STTMetrics, ) __all__ = [ @@ -16,7 +15,6 @@ "SpeechData", "SpeechStream", "STT", - "STTMetrics", "STTCapabilities", "StreamAdapter", "StreamAdapterWrapper", diff --git a/livekit-agents/livekit/agents/stt/stt.py b/livekit-agents/livekit/agents/stt/stt.py index 4632afcb2..d99890fcd 100644 --- a/livekit-agents/livekit/agents/stt/stt.py +++ b/livekit-agents/livekit/agents/stt/stt.py @@ -5,23 +5,15 @@ from abc import ABC, abstractmethod from dataclasses import dataclass, field from enum import Enum, unique -from typing import AsyncIterable, AsyncIterator, List, Literal, TypedDict, Union +from typing import AsyncIterable, AsyncIterator, List, Literal, Union from livekit import rtc +from ..metrics import STTMetrics from ..utils import AudioBuffer, aio from ..utils.audio import calculate_audio_duration -class STTMetrics(TypedDict): - request_id: str - timestamp: float - duration: float - label: str - audio_duration: float - streamed: bool - - @unique class SpeechEventType(str, Enum): START_OF_SPEECH = "start_of_speech" @@ -87,14 +79,15 @@ async def recognize( start_time = time.perf_counter() event = await self._recognize_impl(buffer, language=language) duration = time.perf_counter() - start_time - stt_metrics: STTMetrics = { - "request_id": event.request_id, - "timestamp": time.time(), - "duration": duration, - "label": self._label, - "audio_duration": calculate_audio_duration(buffer), - "streamed": False, - } + stt_metrics = STTMetrics( + request_id=event.request_id, + timestamp=time.time(), + duration=duration, + label=self._label, + audio_duration=calculate_audio_duration(buffer), + streamed=False, + error=None, + ) self.emit("metrics_collected", stt_metrics) return event @@ -154,14 +147,15 @@ async def _metrics_monitor_task( ), "recognition_usage must be provided for RECOGNITION_USAGE event" duration = time.perf_counter() - start_time - stt_metrics: STTMetrics = { - "request_id": ev.request_id, - "timestamp": time.time(), - "duration": duration, - "label": self._stt._label, - "audio_duration": ev.recognition_usage.audio_duration, - "streamed": True, - } + stt_metrics = STTMetrics( + request_id=ev.request_id, + timestamp=time.time(), + duration=duration, + label=self._stt._label, + audio_duration=ev.recognition_usage.audio_duration, + streamed=True, + error=None, + ) self._stt.emit("metrics_collected", stt_metrics) diff --git a/livekit-agents/livekit/agents/tts/__init__.py b/livekit-agents/livekit/agents/tts/__init__.py index 0e9981871..1cd4dabfc 100644 --- a/livekit-agents/livekit/agents/tts/__init__.py +++ b/livekit-agents/livekit/agents/tts/__init__.py @@ -5,12 +5,10 @@ SynthesizedAudio, SynthesizeStream, TTSCapabilities, - TTSMetrics, ) __all__ = [ "TTS", - "TTSMetrics", "SynthesizedAudio", "SynthesizeStream", "TTSCapabilities", diff --git a/livekit-agents/livekit/agents/tts/tts.py b/livekit-agents/livekit/agents/tts/tts.py index ef1deda07..36da4646f 100644 --- a/livekit-agents/livekit/agents/tts/tts.py +++ b/livekit-agents/livekit/agents/tts/tts.py @@ -4,25 +4,14 @@ import time from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import AsyncIterable, AsyncIterator, Literal, TypedDict, Union +from typing import AsyncIterable, AsyncIterator, Literal, Union from livekit import rtc +from ..metrics import TTSMetrics from ..utils import aio, audio -class TTSMetrics(TypedDict): - timestamp: float - request_id: str - ttfb: float - duration: float - audio_duration: float - cancelled: bool - characters_count: int - label: str - streamed: bool - - @dataclass class SynthesizedAudio: frame: rtc.AudioFrame @@ -109,17 +98,18 @@ async def _metrics_monitor_task( audio_duration += ev.frame.duration duration = time.perf_counter() - start_time - metrics: TTSMetrics = { - "timestamp": time.time(), - "request_id": request_id, - "ttfb": ttfb, - "duration": duration, - "characters_count": len(self._input_text), - "audio_duration": audio_duration, - "cancelled": self._task.cancelled(), - "label": self._tts._label, - "streamed": False, - } + metrics = TTSMetrics( + timestamp=time.time(), + request_id=request_id, + ttfb=ttfb, + duration=duration, + characters_count=len(self._input_text), + audio_duration=audio_duration, + cancelled=self._task.cancelled(), + label=self._tts._label, + streamed=False, + error=None, + ) self._tts.emit("metrics_collected", metrics) async def collect(self) -> rtc.AudioFrame: @@ -185,17 +175,18 @@ def _emit_metrics(): if not text: return - metrics: TTSMetrics = { - "timestamp": time.time(), - "request_id": request_id, - "ttfb": ttfb, - "duration": duration, - "characters_count": len(text), - "audio_duration": audio_duration, - "cancelled": self._task.cancelled(), - "label": self._tts._label, - "streamed": True, - } + metrics = TTSMetrics( + timestamp=time.time(), + request_id=request_id, + ttfb=ttfb, + duration=duration, + characters_count=len(text), + audio_duration=audio_duration, + cancelled=self._task.cancelled(), + label=self._tts._label, + streamed=True, + error=None, + ) self._tts.emit("metrics_collected", metrics) audio_duration = 0.0 diff --git a/livekit-agents/livekit/agents/vad.py b/livekit-agents/livekit/agents/vad.py index f94cc0fd4..b11a77656 100644 --- a/livekit-agents/livekit/agents/vad.py +++ b/livekit-agents/livekit/agents/vad.py @@ -5,20 +5,14 @@ from abc import ABC, abstractmethod from dataclasses import dataclass, field from enum import Enum, unique -from typing import AsyncIterable, AsyncIterator, List, Literal, TypedDict, Union +from typing import AsyncIterable, AsyncIterator, List, Literal, Union from livekit import rtc +from .metrics import VADMetrics from .utils import aio -class VADMetrics(TypedDict): - timestamp: float - inference_duration_total: float - inference_count: int - label: str - - @unique class VADEventType(str, Enum): START_OF_SPEECH = "start_of_speech" @@ -123,12 +117,12 @@ async def _metrics_monitor_task(self, event_aiter: AsyncIterable[VADEvent]) -> N inference_count += 1 if inference_count >= 1 / self._vad.capabilities.update_interval: - vad_metrics: VADMetrics = { - "timestamp": time.time(), - "inference_duration_total": inference_duration_total, - "inference_count": inference_count, - "label": self._vad._label, - } + vad_metrics = VADMetrics( + timestamp=time.time(), + inference_duration_total=inference_duration_total, + inference_count=inference_count, + label=self._vad._label, + ) self._vad.emit("metrics_collected", vad_metrics) inference_duration_total = 0.0