From 70ae89b107dbe5990558c98b09a9d0c38288c5b4 Mon Sep 17 00:00:00 2001 From: Tim Sweeney Date: Fri, 11 Oct 2024 14:56:29 -0700 Subject: [PATCH] refactor 3 --- tests/trace/test_op_coroutines.py | 89 +++++++++++++++++++++++++++++++ weave/trace/op.py | 13 +++-- weave/trace/weave_client.py | 3 +- 3 files changed, 100 insertions(+), 5 deletions(-) create mode 100644 tests/trace/test_op_coroutines.py diff --git a/tests/trace/test_op_coroutines.py b/tests/trace/test_op_coroutines.py new file mode 100644 index 00000000000..0000b1e3caf --- /dev/null +++ b/tests/trace/test_op_coroutines.py @@ -0,0 +1,89 @@ +import asyncio +from typing import Coroutine + +import pytest +import weave +from weave.trace.weave_client import Call + + +def test_non_async_non_coro(client): + @weave.op() + def non_async_non_coro(): + return 1 + + res = non_async_non_coro() + assert res == 1 + res, call = non_async_non_coro.call() + assert isinstance(call, Call) + assert call.ended_at is not None + assert res == 1 + assert call.ended_at is not None + +@pytest.mark.asyncio +async def test_non_async_coro(client): + @weave.op() + def non_async_coro(): + return asyncio.to_thread(lambda: 1) + + res = non_async_coro() + assert isinstance(res, Coroutine) + assert await res == 1 + res, call = non_async_coro.call() + assert isinstance(call, Call) + assert call.ended_at is not None + assert isinstance(res, Coroutine) + assert await res == 1 + assert call.ended_at is not None + +@pytest.mark.asyncio +async def test_async_coro(client): + @weave.op() + async def async_coro(): + return asyncio.to_thread(lambda: 1) + + res = async_coro() + assert isinstance(res, Coroutine) + res2 = await res + assert isinstance(res2, Coroutine) + assert await res2 == 1 + res, call = async_coro.call() + assert isinstance(call, Call) + assert call.ended_at is None # BIG DIFFERENCE! We haven't ended the outer op yet! + assert isinstance(res, Coroutine) + res2 = await res + assert isinstance(res2, Coroutine) # Since we did not await the inner coroutine, we still have a coroutine here. + assert await res2 == 1 + assert call.ended_at is not None + +@pytest.mark.asyncio +async def test_async_awaited_coro(client): + @weave.op() + async def async_awaited_coro(): + return await asyncio.to_thread(lambda: 1) + + res = async_awaited_coro() + assert isinstance(res, Coroutine) + assert await res == 1 + res, call = async_awaited_coro.call() + assert isinstance(call, Call) + assert call.ended_at is None # BIG DIFFERENCE! We haven't ended the outer op yet! + assert isinstance(res, Coroutine) + assert await res == 1 + assert call.ended_at is not None + + +@pytest.mark.asyncio +async def test_async_non_coro(client): + @weave.op() + async def async_non_coro(): + return 1 + + res = async_non_coro() + assert isinstance(res, Coroutine) + assert await res == 1 + res, call = async_non_coro.call() + assert isinstance(call, Call) + assert call.ended_at is None # BIG DIFFERENCE! We haven't ended the outer op yet! + assert isinstance(res, Coroutine) + assert await res == 1 + assert call.ended_at is not None \ No newline at end of file diff --git a/weave/trace/op.py b/weave/trace/op.py index 158776447f0..a18ea2d7165 100644 --- a/weave/trace/op.py +++ b/weave/trace/op.py @@ -341,7 +341,7 @@ def add(a: int, b: int) -> int: call = Call(_op_name="", trace_id="", parent_id=None, project_id="", inputs={}) func = op.resolve_fn - + is_async = inspect.iscoroutinefunction(func) if settings.should_disable_weave(): res = func(*args, **kwargs) elif weave_client_context.get_weave_client() is None: @@ -350,19 +350,24 @@ def add(a: int, b: int) -> int: res = func(*args, **kwargs) else: try: - # This try/except allows us to fail gracefully and - # still let the user code continue to execute + # Weird consideration: if the user does `op.call` and does not + # await the result, then the `started_at` timestamp will be + # set before the op itself executes! call = _create_call(op, *args, __weave=__weave, **kwargs) execute_res = _execute_call( op, call, *args, __should_raise=__should_raise, **kwargs ) return execute_res except Exception as e: + # This try/except allows us to fail gracefully and + # still let the user code continue to execute if get_raise_on_captured_errors(): raise log_once( logger.error, - ASYNC_CALL_CREATE_MSG.format(traceback.format_exc()), + (ASYNC_CALL_CREATE_MSG if is_async else CALL_CREATE_MSG).format( + traceback.format_exc() + ), ) res = func(*args, **kwargs) diff --git a/weave/trace/weave_client.py b/weave/trace/weave_client.py index eb3650a00d9..4a2f07b2046 100644 --- a/weave/trace/weave_client.py +++ b/weave/trace/weave_client.py @@ -731,6 +731,8 @@ def finish_call( *, op: Optional[Op] = None, ) -> None: + ended_at = datetime.datetime.now(tz=datetime.timezone.utc) + call.ended_at = ended_at original_output = output if op is not None and op.postprocess_output: @@ -787,7 +789,6 @@ def finish_call( call.exception = exception_str project_id = self._project_id() - ended_at = datetime.datetime.now(tz=datetime.timezone.utc) # The finish handler serves as a last chance for integrations # to customize what gets logged for a call.