From 95a90f04ab9b4f49e8f7100e4c480e62f4ac03f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9o=20Monnom?= Date: Thu, 31 Oct 2024 12:57:50 -0700 Subject: [PATCH] fix: invalid request on anthropic (#1018) --- .changeset/neat-islands-drum.md | 6 +++++ livekit-agents/livekit/agents/cli/log.py | 1 + livekit-agents/livekit/agents/llm/llm.py | 13 ++++++++--- livekit-agents/livekit/agents/stt/stt.py | 11 +++++++--- livekit-agents/livekit/agents/tts/tts.py | 22 ++++++++++++++----- livekit-agents/livekit/agents/vad.py | 11 +++++++--- .../livekit/plugins/anthropic/llm.py | 11 +++++----- 7 files changed, 55 insertions(+), 20 deletions(-) create mode 100644 .changeset/neat-islands-drum.md diff --git a/.changeset/neat-islands-drum.md b/.changeset/neat-islands-drum.md new file mode 100644 index 000000000..6a5fff91f --- /dev/null +++ b/.changeset/neat-islands-drum.md @@ -0,0 +1,6 @@ +--- +"livekit-plugins-anthropic": patch +"livekit-agents": patch +--- + +fix: invalid request on anthropic diff --git a/livekit-agents/livekit/agents/cli/log.py b/livekit-agents/livekit/agents/cli/log.py index c6c578645..3702eb9db 100644 --- a/livekit-agents/livekit/agents/cli/log.py +++ b/livekit-agents/livekit/agents/cli/log.py @@ -18,6 +18,7 @@ "openai", "livekit", "watchfiles", + "anthropic", ] diff --git a/livekit-agents/livekit/agents/llm/llm.py b/livekit-agents/livekit/agents/llm/llm.py index 4c89fccdc..e260b07f9 100644 --- a/livekit-agents/livekit/agents/llm/llm.py +++ b/livekit-agents/livekit/agents/llm/llm.py @@ -9,6 +9,7 @@ from livekit import rtc from .. import utils +from ..log import logger from ..metrics import LLMMetrics from ..utils import aio from . import function_context @@ -86,6 +87,7 @@ def __init__( @abstractmethod async def _main_task(self) -> None: ... + @utils.log_exceptions(logger=logger) async def _metrics_monitor_task( self, event_aiter: AsyncIterable[ChatChunk] ) -> None: @@ -150,10 +152,15 @@ async def aclose(self) -> None: await self._metrics_task async def __anext__(self) -> ChatChunk: - if self._task.done() and (exc := self._task.exception()): - raise exc + try: + val = await self._event_aiter.__anext__() + except StopAsyncIteration: + if self._task.done() and (exc := self._task.exception()): + raise exc from None - return await self._event_aiter.__anext__() + raise StopAsyncIteration + + return val def __aiter__(self) -> AsyncIterator[ChatChunk]: return self diff --git a/livekit-agents/livekit/agents/stt/stt.py b/livekit-agents/livekit/agents/stt/stt.py index d99890fcd..9f0cf22e4 100644 --- a/livekit-agents/livekit/agents/stt/stt.py +++ b/livekit-agents/livekit/agents/stt/stt.py @@ -208,10 +208,15 @@ async def aclose(self) -> None: await self._metrics_task async def __anext__(self) -> SpeechEvent: - if self._task.done() and (exc := self._task.exception()): - raise exc + try: + val = await self._event_aiter.__anext__() + except StopAsyncIteration: + if self._task.done() and (exc := self._task.exception()): + raise exc from None - return await self._event_aiter.__anext__() + raise StopAsyncIteration + + return val def __aiter__(self) -> AsyncIterator[SpeechEvent]: return self diff --git a/livekit-agents/livekit/agents/tts/tts.py b/livekit-agents/livekit/agents/tts/tts.py index 36da4646f..e4c7b453e 100644 --- a/livekit-agents/livekit/agents/tts/tts.py +++ b/livekit-agents/livekit/agents/tts/tts.py @@ -129,10 +129,15 @@ async def aclose(self) -> None: await self._metrics_task async def __anext__(self) -> SynthesizedAudio: - if self._task.done() and (exc := self._task.exception()): - raise exc + try: + val = await self._event_aiter.__anext__() + except StopAsyncIteration: + if self._task.done() and (exc := self._task.exception()): + raise exc from None - return await self._event_aiter.__anext__() + raise StopAsyncIteration + + return val def __aiter__(self) -> AsyncIterator[SynthesizedAudio]: return self @@ -254,10 +259,15 @@ def _check_input_not_ended(self) -> None: raise RuntimeError(f"{cls.__module__}.{cls.__name__} input ended") async def __anext__(self) -> SynthesizedAudio: - if self._task.done() and (exc := self._task.exception()): - raise exc + try: + val = await self._event_aiter.__anext__() + except StopAsyncIteration: + if self._task.done() and (exc := self._task.exception()): + raise exc from None + + raise StopAsyncIteration - return await self._event_aiter.__anext__() + return val def __aiter__(self) -> AsyncIterator[SynthesizedAudio]: return self diff --git a/livekit-agents/livekit/agents/vad.py b/livekit-agents/livekit/agents/vad.py index b11a77656..93b204e4b 100644 --- a/livekit-agents/livekit/agents/vad.py +++ b/livekit-agents/livekit/agents/vad.py @@ -153,10 +153,15 @@ async def aclose(self) -> None: await self._metrics_task async def __anext__(self) -> VADEvent: - if self._task.done() and (exc := self._task.exception()): - raise exc + try: + val = await self._event_aiter.__anext__() + except StopAsyncIteration: + if self._task.done() and (exc := self._task.exception()): + raise exc from None - return await self._event_aiter.__anext__() + raise StopAsyncIteration + + return val def __aiter__(self) -> AsyncIterator[VADEvent]: return self diff --git a/livekit-plugins/livekit-plugins-anthropic/livekit/plugins/anthropic/llm.py b/livekit-plugins/livekit-plugins-anthropic/livekit/plugins/anthropic/llm.py index 42737650d..cc6cf704f 100644 --- a/livekit-plugins/livekit-plugins-anthropic/livekit/plugins/anthropic/llm.py +++ b/livekit-plugins/livekit-plugins-anthropic/livekit/plugins/anthropic/llm.py @@ -111,6 +111,7 @@ def chat( latest_system_message = _latest_system_message(chat_ctx) anthropic_ctx = _build_anthropic_context(chat_ctx.messages, id(self)) collaped_anthropic_ctx = _merge_messages(anthropic_ctx) + stream = self._client.messages.create( max_tokens=opts.get("max_tokens", 1024), system=latest_system_message, @@ -155,10 +156,10 @@ def __init__( self._output_tokens = 0 async def _main_task(self) -> None: - if not self._anthropic_stream: - self._anthropic_stream = await self._awaitable_anthropic_stream - try: + if not self._anthropic_stream: + self._anthropic_stream = await self._awaitable_anthropic_stream + async with self._anthropic_stream as stream: async for event in stream: chat_chunk = self._parse_event(event) @@ -325,7 +326,7 @@ def _build_anthropic_message( a_content = a_msg["content"] # add content if provided - if isinstance(msg.content, str): + if isinstance(msg.content, str) and msg.content: a_msg["content"].append( anthropic.types.TextBlock( text=msg.content, @@ -334,7 +335,7 @@ def _build_anthropic_message( ) elif isinstance(msg.content, list): for cnt in msg.content: - if isinstance(cnt, str): + if isinstance(cnt, str) and cnt: content: anthropic.types.TextBlock = anthropic.types.TextBlock( text=cnt, type="text",