Skip to content

Commit

Permalink
fix: invalid request on anthropic (#1018)
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Oct 31, 2024
1 parent adf685e commit 95a90f0
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 20 deletions.
6 changes: 6 additions & 0 deletions .changeset/neat-islands-drum.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"livekit-plugins-anthropic": patch
"livekit-agents": patch
---

fix: invalid request on anthropic
1 change: 1 addition & 0 deletions livekit-agents/livekit/agents/cli/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"openai",
"livekit",
"watchfiles",
"anthropic",
]


Expand Down
13 changes: 10 additions & 3 deletions livekit-agents/livekit/agents/llm/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from livekit import rtc

from .. import utils
from ..log import logger
from ..metrics import LLMMetrics
from ..utils import aio
from . import function_context
Expand Down Expand Up @@ -86,6 +87,7 @@ def __init__(
@abstractmethod
async def _main_task(self) -> None: ...

@utils.log_exceptions(logger=logger)
async def _metrics_monitor_task(
self, event_aiter: AsyncIterable[ChatChunk]
) -> None:
Expand Down Expand Up @@ -150,10 +152,15 @@ async def aclose(self) -> None:
await self._metrics_task

async def __anext__(self) -> ChatChunk:
if self._task.done() and (exc := self._task.exception()):
raise exc
try:
val = await self._event_aiter.__anext__()
except StopAsyncIteration:
if self._task.done() and (exc := self._task.exception()):
raise exc from None

return await self._event_aiter.__anext__()
raise StopAsyncIteration

return val

def __aiter__(self) -> AsyncIterator[ChatChunk]:
return self
11 changes: 8 additions & 3 deletions livekit-agents/livekit/agents/stt/stt.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,15 @@ async def aclose(self) -> None:
await self._metrics_task

async def __anext__(self) -> SpeechEvent:
if self._task.done() and (exc := self._task.exception()):
raise exc
try:
val = await self._event_aiter.__anext__()
except StopAsyncIteration:
if self._task.done() and (exc := self._task.exception()):
raise exc from None

return await self._event_aiter.__anext__()
raise StopAsyncIteration

return val

def __aiter__(self) -> AsyncIterator[SpeechEvent]:
return self
Expand Down
22 changes: 16 additions & 6 deletions livekit-agents/livekit/agents/tts/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,15 @@ async def aclose(self) -> None:
await self._metrics_task

async def __anext__(self) -> SynthesizedAudio:
if self._task.done() and (exc := self._task.exception()):
raise exc
try:
val = await self._event_aiter.__anext__()
except StopAsyncIteration:
if self._task.done() and (exc := self._task.exception()):
raise exc from None

return await self._event_aiter.__anext__()
raise StopAsyncIteration

return val

def __aiter__(self) -> AsyncIterator[SynthesizedAudio]:
return self
Expand Down Expand Up @@ -254,10 +259,15 @@ def _check_input_not_ended(self) -> None:
raise RuntimeError(f"{cls.__module__}.{cls.__name__} input ended")

async def __anext__(self) -> SynthesizedAudio:
if self._task.done() and (exc := self._task.exception()):
raise exc
try:
val = await self._event_aiter.__anext__()
except StopAsyncIteration:
if self._task.done() and (exc := self._task.exception()):
raise exc from None

raise StopAsyncIteration

return await self._event_aiter.__anext__()
return val

def __aiter__(self) -> AsyncIterator[SynthesizedAudio]:
return self
11 changes: 8 additions & 3 deletions livekit-agents/livekit/agents/vad.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,15 @@ async def aclose(self) -> None:
await self._metrics_task

async def __anext__(self) -> VADEvent:
if self._task.done() and (exc := self._task.exception()):
raise exc
try:
val = await self._event_aiter.__anext__()
except StopAsyncIteration:
if self._task.done() and (exc := self._task.exception()):
raise exc from None

return await self._event_aiter.__anext__()
raise StopAsyncIteration

return val

def __aiter__(self) -> AsyncIterator[VADEvent]:
return self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def chat(
latest_system_message = _latest_system_message(chat_ctx)
anthropic_ctx = _build_anthropic_context(chat_ctx.messages, id(self))
collaped_anthropic_ctx = _merge_messages(anthropic_ctx)

stream = self._client.messages.create(
max_tokens=opts.get("max_tokens", 1024),
system=latest_system_message,
Expand Down Expand Up @@ -155,10 +156,10 @@ def __init__(
self._output_tokens = 0

async def _main_task(self) -> None:
if not self._anthropic_stream:
self._anthropic_stream = await self._awaitable_anthropic_stream

try:
if not self._anthropic_stream:
self._anthropic_stream = await self._awaitable_anthropic_stream

async with self._anthropic_stream as stream:
async for event in stream:
chat_chunk = self._parse_event(event)
Expand Down Expand Up @@ -325,7 +326,7 @@ def _build_anthropic_message(
a_content = a_msg["content"]

# add content if provided
if isinstance(msg.content, str):
if isinstance(msg.content, str) and msg.content:
a_msg["content"].append(
anthropic.types.TextBlock(
text=msg.content,
Expand All @@ -334,7 +335,7 @@ def _build_anthropic_message(
)
elif isinstance(msg.content, list):
for cnt in msg.content:
if isinstance(cnt, str):
if isinstance(cnt, str) and cnt:
content: anthropic.types.TextBlock = anthropic.types.TextBlock(
text=cnt,
type="text",
Expand Down

0 comments on commit 95a90f0

Please sign in to comment.