diff --git a/src/intelligence_layer/core/task.py b/src/intelligence_layer/core/task.py index 0928c5fea..d63d4d2a2 100644 --- a/src/intelligence_layer/core/task.py +++ b/src/intelligence_layer/core/task.py @@ -68,7 +68,6 @@ def run(self, input: Input, tracer: Tracer) -> Output: Args: input: Generic input defined by the task implementation tracer: The `Tracer` used for tracing. - trace_id: An optional id of the run, used to track its trace. Returns: Generic output defined by the task implementation. """ @@ -95,7 +94,6 @@ def run_concurrently( concurrency_limit: An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task. - trace_id: An optional id of the run, used to track its trace. Returns: The Outputs generated by calling `run` for each given Input. diff --git a/src/intelligence_layer/core/tracer/tracer.py b/src/intelligence_layer/core/tracer/tracer.py index c5cb4a75f..81cf0f588 100644 --- a/src/intelligence_layer/core/tracer/tracer.py +++ b/src/intelligence_layer/core/tracer/tracer.py @@ -117,7 +117,7 @@ def span( self, name: str, timestamp: Optional[datetime] = None, - ) -> "Span": # TODO + ) -> "Span": """Generate a span from the current span or logging instance. Allows for grouping multiple logs and duration together as a single, logical step in the @@ -129,8 +129,7 @@ def span( Args: name: A descriptive name of what this span will contain logs about. - timestamp: optional override of the starting timestamp. Otherwise should default to now. - trace_id: optional override of a trace id. Otherwise it creates a new default id. + timestamp: Override of the starting timestamp. Defaults to call time. Returns: An instance of a Span. @@ -155,32 +154,19 @@ def task_span( Args: task_name: The name of the task that is being logged input: The input for the task that is being logged. - timestamp: optional override of the starting timestamp. Otherwise should default to now. - trace_id: optional override of a trace id. Otherwise it creates a new default id. - + timestamp: Override of the starting timestamp. Defaults to call time. Returns: An instance of a TaskSpan. """ ... - def ensure_id(self, id: Optional[str]) -> str: - """Returns a valid id for tracing. - - Args: - id: current id to use if present. - Returns: - `id` if present, otherwise a new unique ID. - """ - - return id if id is not None else str(uuid4()) - @abstractmethod def export_for_viewing(self) -> Sequence[ExportedSpan]: """Converts the trace to a format that can be read by the trace viewer. - The format is inspired by the OpenTelemetry Format, but does not abide by it, - because it is too complex for our use-case. + The format is inspired by the OpenTelemetry Format, but does not abide by it. + Specifically, it cuts away unused concepts, such as links. Returns: A list of spans which includes the current span and all its child spans. @@ -199,12 +185,22 @@ class Span(Tracer, AbstractContextManager["Span"]): Logs and other spans can be nested underneath. - Can also be used as a Context Manager to easily capture the start and end time, and keep the - span only in scope while it is active. + Can also be used as a context manager to easily capture the start and end time, and keep the + span only in scope while it is open. + + Attributes: + context: The context of the current span. If the span is a root span, the trace id will be equal to its span id. + status_code: Status of the span. Will be "OK" unless the span was interrupted by an exception. """ def __init__(self, context: Optional[Context] = None): - # super().__init__() + """Creates a span from the context of its parent. + + Initializes the spans `context` based on the parent context and its `status_code`. + + Args: + context: Context of the parent. Defaults to None. + """ span_id = str(uuid4()) if context is None: trace_id = span_id @@ -254,16 +250,6 @@ def end(self, timestamp: Optional[datetime] = None) -> None: """ self._closed = True - def ensure_id(self, id: str | None) -> str: - """Returns a valid id for tracing. - - Args: - id: current id to use if present. - Returns: - `id` if present, otherwise id of this `Span` - """ - return id if id is not None else self.id() - def __exit__( self, exc_type: Optional[type[BaseException]], diff --git a/tests/core/test_task.py b/tests/core/test_task.py index bdc2fa4e8..429b69c84 100644 --- a/tests/core/test_task.py +++ b/tests/core/test_task.py @@ -4,7 +4,7 @@ from time import sleep from typing import Callable -from intelligence_layer.core import InMemorySpan, InMemoryTracer, NoOpTracer, TaskSpan +from intelligence_layer.core import InMemoryTracer, NoOpTracer, TaskSpan from intelligence_layer.core.task import MAX_CONCURRENCY, Task @@ -111,21 +111,3 @@ def test_sub_tasks_do_not_introduce_multiple_task_spans() -> None: assert isinstance(tracer.entries[0], TaskSpan) assert tracer.entries[0].entries assert not isinstance(tracer.entries[0].entries[0], TaskSpan) - - -def test_ids_are_set_in_concurrent_run() -> None: - tracer = InMemoryTracer() - task = DeadlockDetector() - - task.run_concurrently([None] * MAX_CONCURRENCY, tracer, trace_id="ID") - assert tracer.entries - assert tracer.entries[0].id() == "ID" - - -def test_ids_are_equal_for_multiple_subtasks() -> None: - tracer = InMemoryTracer() - NestedTask().run(None, tracer, "ID") - assert isinstance(tracer.entries[0], InMemorySpan) - assert tracer.entries[0].id() == "ID" - assert isinstance(tracer.entries[0].entries[0], InMemorySpan) - assert tracer.entries[0].entries[0].id() == "ID" diff --git a/tests/core/test_temp.py b/tests/core/test_temp.py index e72415d54..fad3ce555 100644 --- a/tests/core/test_temp.py +++ b/tests/core/test_temp.py @@ -1,14 +1,3 @@ -import contextlib -import json -import os -import time -from pathlib import Path -from typing import Any, Iterator, Optional -from unittest.mock import Mock - -import pytest -import requests -from aleph_alpha_client import Prompt from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource @@ -16,23 +5,8 @@ from opentelemetry.sdk.trace.export import BatchSpanProcessor from pytest import fixture -from intelligence_layer.core import ( - CompleteInput, - CompleteOutput, - CompositeTracer, - FileTracer, - InMemorySpan, - InMemoryTaskSpan, - InMemoryTracer, - LogEntry, - LuminousControlModel, - OpenTelemetryTracer, - Task, - TaskSpan, - utc_now, -) -from intelligence_layer.core.tracer.persistent_tracer import TracerLogEntryFailed -from intelligence_layer.core.tracer.tracer import ErrorValue +from intelligence_layer.core import OpenTelemetryTracer + @fixture def open_telemetry_tracer() -> tuple[str, OpenTelemetryTracer]: @@ -50,8 +24,10 @@ def open_telemetry_tracer() -> tuple[str, OpenTelemetryTracer]: def test_temp_1(open_telemetry_tracer): print("test_1") + def test_temp_2(open_telemetry_tracer): print("test_2") + def test_temp_3(open_telemetry_tracer): - print("test_3") \ No newline at end of file + print("test_3") diff --git a/tests/evaluation/test_domain.py b/tests/evaluation/test_domain.py index e4db9d584..df2ef0edc 100644 --- a/tests/evaluation/test_domain.py +++ b/tests/evaluation/test_domain.py @@ -1,6 +1,7 @@ from pytest import raises -from intelligence_layer.core import InMemorySpan, InMemoryTaskSpan, LogEntry, utc_now +from intelligence_layer.core import utc_now +from intelligence_layer.core.tracer.in_memory_tracer import InMemoryTracer from intelligence_layer.evaluation import ( AggregationOverview, EvaluationFailed, @@ -14,22 +15,13 @@ def test_to_trace_entry() -> None: now = utc_now() - entry = _to_trace_entry( - InMemoryTaskSpan( - name="task", - input="input", - output="output", - start_timestamp=now, - end_timestamp=now, - entries=[ - LogEntry(message="message", value="value", trace_id="ID"), - InMemorySpan( - name="span", start_timestamp=now, end_timestamp=now, trace_id="ID" - ), - ], - trace_id="ID", - ) - ) + span = InMemoryTracer().task_span("task", timestamp=now, input="input") + span.span("span", now).end(now) + span.log(message="message", value="value", timestamp=now) + span.record_output("output") + span.end(now) + + entry = _to_trace_entry(span) assert entry == TaskSpanTrace( name="task", @@ -38,8 +30,8 @@ def test_to_trace_entry() -> None: start=now, end=now, traces=[ - LogTrace(message="message", value="value"), SpanTrace(name="span", traces=[], start=now, end=now), + LogTrace(message="message", value="value"), ], ) @@ -49,7 +41,10 @@ def test_deserialize_task_trace() -> None: name="task", start=utc_now(), end=utc_now(), - traces=[], + traces=[ + SpanTrace(name="span", traces=[], start=utc_now(), end=utc_now()), + LogTrace(message="message", value="value"), + ], input=[{"a": "b"}], output=["c"], )