From 9e41a1b925695c0b76a04d8b89fd89cadc9eb327 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niklas=20K=C3=B6hnecke?= Date: Thu, 23 May 2024 11:28:50 +0200 Subject: [PATCH] Documentation and test fixes --- src/intelligence_layer/core/__init__.py | 14 +- src/intelligence_layer/core/task.py | 2 - .../core/tracer/composite_tracer.py | 7 +- .../core/tracer/file_tracer.py | 10 +- .../core/tracer/in_memory_tracer.py | 123 ++++------ .../core/tracer/open_telemetry_tracer.py | 17 +- .../core/tracer/persistent_tracer.py | 200 +++++++++++++++- src/intelligence_layer/core/tracer/tracer.py | 225 +++--------------- .../evaluation/run/file_run_repository.py | 2 +- tests/core/test_task.py | 20 +- tests/core/test_temp.py | 57 ----- tests/core/tracer/test_file_tracer.py | 19 +- tests/core/tracer/test_in_memory_tracer.py | 6 +- .../core/tracer/test_open_telemetry_tracer.py | 38 ++- tests/core/tracer/test_tracer.py | 4 +- tests/evaluation/test_domain.py | 33 ++- 16 files changed, 354 insertions(+), 423 deletions(-) delete mode 100644 tests/core/test_temp.py diff --git a/src/intelligence_layer/core/__init__.py b/src/intelligence_layer/core/__init__.py index 02d25369a..e5b4baa32 100644 --- a/src/intelligence_layer/core/__init__.py +++ b/src/intelligence_layer/core/__init__.py @@ -44,22 +44,22 @@ from .tracer.in_memory_tracer import InMemorySpan as InMemorySpan from .tracer.in_memory_tracer import InMemoryTaskSpan as InMemoryTaskSpan from .tracer.in_memory_tracer import InMemoryTracer as InMemoryTracer +from .tracer.in_memory_tracer import LogEntry as LogEntry from .tracer.open_telemetry_tracer import OpenTelemetryTracer as OpenTelemetryTracer +from .tracer.persistent_tracer import EndSpan as EndSpan +from .tracer.persistent_tracer import EndTask as EndTask +from .tracer.persistent_tracer import LogLine as LogLine from .tracer.persistent_tracer import PersistentSpan as PersistentSpan from .tracer.persistent_tracer import PersistentTaskSpan as PersistentTaskSpan from .tracer.persistent_tracer import PersistentTracer as PersistentTracer +from .tracer.persistent_tracer import PlainEntry as PlainEntry +from .tracer.persistent_tracer import StartSpan as StartSpan +from .tracer.persistent_tracer import StartTask as StartTask from .tracer.persistent_tracer import TracerLogEntryFailed as TracerLogEntryFailed -from .tracer.tracer import EndSpan as EndSpan -from .tracer.tracer import EndTask as EndTask from .tracer.tracer import JsonSerializer as JsonSerializer -from .tracer.tracer import LogEntry as LogEntry -from .tracer.tracer import LogLine as LogLine from .tracer.tracer import NoOpTracer as NoOpTracer -from .tracer.tracer import PlainEntry as PlainEntry from .tracer.tracer import PydanticSerializable as PydanticSerializable from .tracer.tracer import Span as Span -from .tracer.tracer import StartSpan as StartSpan -from .tracer.tracer import StartTask as StartTask from .tracer.tracer import TaskSpan as TaskSpan from .tracer.tracer import Tracer as Tracer from .tracer.tracer import utc_now as utc_now diff --git a/src/intelligence_layer/core/task.py b/src/intelligence_layer/core/task.py index 0928c5fea..d63d4d2a2 100644 --- a/src/intelligence_layer/core/task.py +++ b/src/intelligence_layer/core/task.py @@ -68,7 +68,6 @@ def run(self, input: Input, tracer: Tracer) -> Output: Args: input: Generic input defined by the task implementation tracer: The `Tracer` used for tracing. - trace_id: An optional id of the run, used to track its trace. Returns: Generic output defined by the task implementation. """ @@ -95,7 +94,6 @@ def run_concurrently( concurrency_limit: An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task. - trace_id: An optional id of the run, used to track its trace. Returns: The Outputs generated by calling `run` for each given Input. diff --git a/src/intelligence_layer/core/tracer/composite_tracer.py b/src/intelligence_layer/core/tracer/composite_tracer.py index 5f89c5ef4..5f1e44c80 100644 --- a/src/intelligence_layer/core/tracer/composite_tracer.py +++ b/src/intelligence_layer/core/tracer/composite_tracer.py @@ -6,6 +6,7 @@ ExportedSpan, PydanticSerializable, Span, + SpanStatus, TaskSpan, Tracer, utc_now, @@ -96,7 +97,7 @@ def end(self, timestamp: Optional[datetime] = None) -> None: tracer.end(timestamp) @property - def status_code(self): + def status_code(self) -> SpanStatus: status_codes = {tracer.status_code for tracer in self.tracers} if len(status_codes) > 1: raise ValueError( @@ -105,9 +106,9 @@ def status_code(self): return next(iter(status_codes)) @status_code.setter - def status_code(self, value): + def status_code(self, span_status: SpanStatus) -> None: for tracer in self.tracers: - tracer.status_code = value + tracer.status_code = span_status class CompositeTaskSpan(CompositeSpan[TaskSpan], TaskSpan): diff --git a/src/intelligence_layer/core/tracer/file_tracer.py b/src/intelligence_layer/core/tracer/file_tracer.py index 4615e9227..5c7813caa 100644 --- a/src/intelligence_layer/core/tracer/file_tracer.py +++ b/src/intelligence_layer/core/tracer/file_tracer.py @@ -2,16 +2,18 @@ from json import loads from pathlib import Path from typing import Optional +from uuid import UUID from pydantic import BaseModel from intelligence_layer.core.tracer.in_memory_tracer import InMemoryTracer from intelligence_layer.core.tracer.persistent_tracer import ( + LogLine, PersistentSpan, PersistentTaskSpan, PersistentTracer, ) -from intelligence_layer.core.tracer.tracer import Context, LogLine, PydanticSerializable +from intelligence_layer.core.tracer.tracer import Context, PydanticSerializable class FileTracer(PersistentTracer): @@ -30,11 +32,11 @@ class FileTracer(PersistentTracer): the child-elements for a tracer can be identified by referring to this id as parent. """ - def __init__(self, log_file_path: Path | str, *args, **kwargs) -> None: + def __init__(self, log_file_path: Path | str) -> None: super().__init__() self._log_file_path = Path(log_file_path) - def _log_entry(self, id: str, entry: BaseModel) -> None: + def _log_entry(self, id: UUID, entry: BaseModel) -> None: self._log_file_path.parent.mkdir(parents=True, exist_ok=True) with self._log_file_path.open(mode="a", encoding="utf-8") as f: f.write( @@ -66,7 +68,7 @@ def task_span( self._log_task(task, task_name, input, timestamp) return task - def trace(self, trace_id: Optional[str] = None) -> InMemoryTracer: + def traces(self, trace_id: Optional[str] = None) -> InMemoryTracer: with self._log_file_path.open("r") as f: traces = (LogLine.model_validate(loads(line)) for line in f) filtered_traces = ( diff --git a/src/intelligence_layer/core/tracer/in_memory_tracer.py b/src/intelligence_layer/core/tracer/in_memory_tracer.py index de1d3bae7..c44ad99ac 100644 --- a/src/intelligence_layer/core/tracer/in_memory_tracer.py +++ b/src/intelligence_layer/core/tracer/in_memory_tracer.py @@ -5,29 +5,24 @@ import requests import rich -from pydantic import SerializeAsAny +from pydantic import BaseModel, Field, SerializeAsAny from requests import HTTPError +from rich.panel import Panel +from rich.syntax import Syntax from rich.tree import Tree from intelligence_layer.core.tracer.tracer import ( Context, - EndSpan, - EndTask, Event, ExportedSpan, ExportedSpanList, - LogEntry, - LogLine, - PlainEntry, + JsonSerializer, PydanticSerializable, Span, SpanAttributes, - StartSpan, - StartTask, TaskSpan, TaskSpanAttributes, Tracer, - _render_log_value, utc_now, ) @@ -176,6 +171,8 @@ def export_for_viewing(self) -> Sequence[ExportedSpan]: raise RuntimeError( "Span is not closed. A Span must be closed before it is exported for viewing." ) + assert self.end_timestamp is not None + logs: list[LogEntry] = [] exported_spans: list[ExportedSpan] = [] for entry in self.entries: @@ -238,72 +235,42 @@ def _rich_render_(self) -> Tree: return tree -class TreeBuilder: - def __init__(self) -> None: - self.root = InMemoryTracer() - self.tracers: dict[UUID, InMemoryTracer] = dict() - self.tasks: dict[UUID, InMemoryTaskSpan] = dict() - self.spans: dict[UUID, InMemorySpan] = dict() - - def start_task(self, log_line: LogLine) -> None: - start_task = StartTask.model_validate(log_line.entry) - converted_span = InMemoryTaskSpan( - name=start_task.name, - input=start_task.input, - start_timestamp=start_task.start, - context=Context( - trace_id=start_task.trace_id, span_id=str(start_task.parent) - ) - if start_task.trace_id != str(start_task.uuid) - else None, - ) - # if root, also change the trace id - if converted_span.context.trace_id == converted_span.context.span_id: - converted_span.context.trace_id = str(start_task.uuid) - converted_span.context.span_id = str(start_task.uuid) - self.tracers.get(start_task.parent, self.root).entries.append(converted_span) - self.tracers[start_task.uuid] = converted_span - self.tasks[start_task.uuid] = converted_span - - def end_task(self, log_line: LogLine) -> None: - end_task = EndTask.model_validate(log_line.entry) - task_span = self.tasks[end_task.uuid] - task_span.record_output(end_task.output) - task_span.status_code = end_task.status_code - task_span.end(end_task.end) - - def start_span(self, log_line: LogLine) -> None: - start_span = StartSpan.model_validate(log_line.entry) - converted_span = InMemorySpan( - name=start_span.name, - start_timestamp=start_span.start, - context=Context( - trace_id=start_span.trace_id, span_id=str(start_span.parent) - ) - if start_span.trace_id != str(start_span.uuid) - else None, - ) - # if root, also change the trace id - if converted_span.context.trace_id == converted_span.context.span_id: - converted_span.context.trace_id = str(start_span.uuid) - converted_span.context.span_id = str(start_span.uuid) - - self.tracers.get(start_span.parent, self.root).entries.append(converted_span) - self.tracers[start_span.uuid] = converted_span - self.spans[start_span.uuid] = converted_span - - def end_span(self, log_line: LogLine) -> None: - end_span = EndSpan.model_validate(log_line.entry) - span = self.spans[end_span.uuid] - span.status_code = end_span.status_code - span.end(end_span.end) - - def plain_entry(self, log_line: LogLine) -> None: - plain_entry = PlainEntry.model_validate(log_line.entry) - entry = LogEntry( - message=plain_entry.message, - value=plain_entry.value, - timestamp=plain_entry.timestamp, - trace_id=plain_entry.trace_id, - ) - self.tracers[plain_entry.parent].entries.append(entry) +class LogEntry(BaseModel): + """An individual log entry, currently used to represent individual logs by the + `InMemoryTracer`. + + Attributes: + message: A description of the value you are logging, such as the step in the task this + is related to. + value: The relevant data you want to log. Can be anything that is serializable by + Pydantic, which gives the tracers flexibility in how they store and emit the logs. + timestamp: The time that the log was emitted. + id: The ID of the trace to which this log entry belongs. + """ + + message: str + value: SerializeAsAny[PydanticSerializable] + timestamp: datetime = Field(default_factory=datetime.utcnow) + trace_id: UUID + + def _rich_render_(self) -> Panel: + """Renders the trace via classes in the `rich` package""" + return _render_log_value(self.value, self.message) + + def _ipython_display_(self) -> None: + """Default rendering for Jupyter notebooks""" + from rich import print + + print(self._rich_render_()) + + +def _render_log_value(value: PydanticSerializable, title: str) -> Panel: + value = value if isinstance(value, BaseModel) else JsonSerializer(root=value) + return Panel( + Syntax( + value.model_dump_json(indent=2, exclude_defaults=True), + "json", + word_wrap=True, + ), + title=title, + ) diff --git a/src/intelligence_layer/core/tracer/open_telemetry_tracer.py b/src/intelligence_layer/core/tracer/open_telemetry_tracer.py index de0386b17..f0ee03089 100644 --- a/src/intelligence_layer/core/tracer/open_telemetry_tracer.py +++ b/src/intelligence_layer/core/tracer/open_telemetry_tracer.py @@ -3,17 +3,21 @@ from opentelemetry.context import attach, detach from opentelemetry.trace import Span as OpenTSpan +from opentelemetry.trace import StatusCode from opentelemetry.trace import Tracer as OpenTTracer from opentelemetry.trace import set_span_in_context +from pydantic import BaseModel, SerializeAsAny from intelligence_layer.core.tracer.tracer import ( Context, ExportedSpan, + JsonSerializer, PydanticSerializable, Span, + SpanStatus, + SpanType, TaskSpan, Tracer, - _serialize, ) @@ -30,6 +34,7 @@ def span( ) -> "OpenTelemetrySpan": tracer_span = self._tracer.start_span( name, + attributes={"type": SpanType.SPAN.value}, start_time=None if not timestamp else _open_telemetry_timestamp(timestamp), ) token = attach(set_span_in_context(tracer_span)) @@ -43,7 +48,7 @@ def task_span( ) -> "OpenTelemetryTaskSpan": tracer_span = self._tracer.start_span( task_name, - attributes={"input": _serialize(input)}, + attributes={"input": _serialize(input), "type": SpanType.TASK_SPAN.value}, start_time=None if not timestamp else _open_telemetry_timestamp(timestamp), ) token = attach(set_span_in_context(tracer_span)) @@ -86,6 +91,9 @@ def log( def end(self, timestamp: Optional[datetime] = None) -> None: super().end(timestamp) + self.open_ts_span.set_status( + StatusCode.OK if self.status_code == SpanStatus.OK else StatusCode.ERROR + ) detach(self._token) self.open_ts_span.end( _open_telemetry_timestamp(timestamp) if timestamp is not None else None @@ -105,3 +113,8 @@ def _open_telemetry_timestamp(t: datetime) -> int: # Open telemetry expects *nanoseconds* since epoch t_float = t.timestamp() * 1e9 return int(t_float) + + +def _serialize(s: SerializeAsAny[PydanticSerializable]) -> str: + value = s if isinstance(s, BaseModel) else JsonSerializer(root=s) + return value.model_dump_json() diff --git a/src/intelligence_layer/core/tracer/persistent_tracer.py b/src/intelligence_layer/core/tracer/persistent_tracer.py index 1d0b3b7c0..9597229f8 100644 --- a/src/intelligence_layer/core/tracer/persistent_tracer.py +++ b/src/intelligence_layer/core/tracer/persistent_tracer.py @@ -1,41 +1,62 @@ from abc import ABC, abstractmethod from datetime import datetime from typing import Iterable, Optional, Sequence -from uuid import uuid4 +from uuid import UUID, uuid4 -from pydantic import BaseModel +from pydantic import BaseModel, SerializeAsAny -from intelligence_layer.core.tracer.in_memory_tracer import InMemoryTracer, TreeBuilder +from intelligence_layer.core.tracer.in_memory_tracer import ( + InMemorySpan, + InMemoryTaskSpan, + InMemoryTracer, + LogEntry, +) from intelligence_layer.core.tracer.tracer import ( - EndSpan, - EndTask, + Context, ExportedSpan, - LogLine, - PlainEntry, PydanticSerializable, Span, - StartSpan, - StartTask, + SpanStatus, TaskSpan, Tracer, utc_now, ) +class LogLine(BaseModel): + """Represents a complete log-line. + + Attributes: + entry_type: The type of the entry. This is the class-name of one of the classes + representing a log-entry (e.g. "StartTask"). + entry: The actual entry. + + """ + + trace_id: UUID + entry_type: str + entry: SerializeAsAny[PydanticSerializable] + + class PersistentTracer(Tracer, ABC): def __init__(self) -> None: self.current_id = uuid4() @abstractmethod - def _log_entry(self, id: str, entry: BaseModel) -> None: + def _log_entry(self, id: UUID, entry: BaseModel) -> None: pass @abstractmethod - def trace(self, trace_id: str) -> InMemoryTracer: + def traces(self) -> InMemoryTracer: + """Returns all traces of the given tracer. + + Returns: + An InMemoryTracer that contains all traces of the tracer. + """ pass def export_for_viewing(self) -> Sequence[ExportedSpan]: - return self.trace().export_for_viewing() + return self.traces().export_for_viewing() def _log_span( self, span: "PersistentSpan", name: str, timestamp: Optional[datetime] = None @@ -177,3 +198,158 @@ def __init__(self, error_message: str, id: str) -> None: f"Log entry with id {id} failed with error message {error_message}." ) self.description = error_message + + +class StartTask(BaseModel): + """Represents the payload/entry of a log-line indicating that a `TaskSpan` was opened through `Tracer.task_span`. + + Attributes: + uuid: A unique id for the opened `TaskSpan`. + parent: The unique id of the parent element of opened `TaskSpan`. + This could refer to either a surrounding `TaskSpan`, `Span` or the top-level `Tracer`. + name: The name of the task. + start: The timestamp when this `Task` was started (i.e. `run` was called). + input: The `Input` (i.e. parameter for `run`) the `Task` was started with. + trace_id: The trace id of the opened `TaskSpan`. + + """ + + uuid: UUID + parent: UUID + name: str + start: datetime + input: SerializeAsAny[PydanticSerializable] + trace_id: UUID + + +class EndTask(BaseModel): + """Represents the payload/entry of a log-line that indicates that a `TaskSpan` ended (i.e. the context-manager exited). + + Attributes: + uuid: The uuid of the corresponding `StartTask`. + end: the timestamp when this `Task` completed (i.e. `run` returned). + output: the `Output` (i.e. return value of `run`) the `Task` returned. + """ + + uuid: UUID + end: datetime + output: SerializeAsAny[PydanticSerializable] + status_code: SpanStatus = SpanStatus.OK + + +class StartSpan(BaseModel): + """Represents the payload/entry of a log-line indicating that a `Span` was opened through `Tracer.span`. + + Attributes: + uuid: A unique id for the opened `Span`. + parent: The unique id of the parent element of opened `TaskSpan`. + This could refer to either a surrounding `TaskSpan`, `Span` or the top-level `Tracer`. + name: The name of the task. + start: The timestamp when this `Span` was started. + trace_id: The ID of the trace this span belongs to. + """ + + uuid: UUID + parent: UUID + name: str + start: datetime + trace_id: UUID + + +class EndSpan(BaseModel): + """Represents the payload/entry of a log-line that indicates that a `Span` ended. + + Attributes: + uuid: The uuid of the corresponding `StartSpan`. + end: the timestamp when this `Span` completed. + """ + + uuid: UUID + end: datetime + status_code: SpanStatus = SpanStatus.OK + + +class PlainEntry(BaseModel): + """Represents a plain log-entry created through `Tracer.log`. + + Attributes: + message: the message-parameter of `Tracer.log` + value: the value-parameter of `Tracer.log` + timestamp: the timestamp when `Tracer.log` was called. + parent: The unique id of the parent element of the log. + This could refer to either a surrounding `TaskSpan`, `Span` or the top-level `Tracer`. + trace_id: The ID of the trace this entry belongs to. + """ + + message: str + value: SerializeAsAny[PydanticSerializable] + timestamp: datetime + parent: UUID + trace_id: UUID + + +class TreeBuilder: + def __init__(self) -> None: + self.root = InMemoryTracer() + self.tracers: dict[UUID, InMemoryTracer] = dict() + self.tasks: dict[UUID, InMemoryTaskSpan] = dict() + self.spans: dict[UUID, InMemorySpan] = dict() + + def start_task(self, log_line: LogLine) -> None: + start_task = StartTask.model_validate(log_line.entry) + converted_span = InMemoryTaskSpan( + name=start_task.name, + input=start_task.input, + start_timestamp=start_task.start, + context=Context(trace_id=start_task.trace_id, span_id=start_task.parent) + if start_task.trace_id != start_task.uuid + else None, + ) + # if root, also change the trace id + if converted_span.context.trace_id == converted_span.context.span_id: + converted_span.context.trace_id = start_task.uuid + converted_span.context.span_id = start_task.uuid + self.tracers.get(start_task.parent, self.root).entries.append(converted_span) + self.tracers[start_task.uuid] = converted_span + self.tasks[start_task.uuid] = converted_span + + def end_task(self, log_line: LogLine) -> None: + end_task = EndTask.model_validate(log_line.entry) + task_span = self.tasks[end_task.uuid] + task_span.record_output(end_task.output) + task_span.status_code = end_task.status_code + task_span.end(end_task.end) + + def start_span(self, log_line: LogLine) -> None: + start_span = StartSpan.model_validate(log_line.entry) + converted_span = InMemorySpan( + name=start_span.name, + start_timestamp=start_span.start, + context=Context(trace_id=start_span.trace_id, span_id=start_span.parent) + if start_span.trace_id != start_span.uuid + else None, + ) + # if root, also change the trace id + if converted_span.context.trace_id == converted_span.context.span_id: + converted_span.context.trace_id = start_span.uuid + converted_span.context.span_id = start_span.uuid + + self.tracers.get(start_span.parent, self.root).entries.append(converted_span) + self.tracers[start_span.uuid] = converted_span + self.spans[start_span.uuid] = converted_span + + def end_span(self, log_line: LogLine) -> None: + end_span = EndSpan.model_validate(log_line.entry) + span = self.spans[end_span.uuid] + span.status_code = end_span.status_code + span.end(end_span.end) + + def plain_entry(self, log_line: LogLine) -> None: + plain_entry = PlainEntry.model_validate(log_line.entry) + entry = LogEntry( + message=plain_entry.message, + value=plain_entry.value, + timestamp=plain_entry.timestamp, + trace_id=plain_entry.trace_id, + ) + self.tracers[plain_entry.parent].entries.append(entry) diff --git a/src/intelligence_layer/core/tracer/tracer.py b/src/intelligence_layer/core/tracer/tracer.py index c5cb4a75f..f97a6c9e4 100644 --- a/src/intelligence_layer/core/tracer/tracer.py +++ b/src/intelligence_layer/core/tracer/tracer.py @@ -4,12 +4,10 @@ from datetime import datetime, timezone from enum import Enum from types import TracebackType -from typing import TYPE_CHECKING, List, Mapping, Optional, Sequence +from typing import TYPE_CHECKING, Mapping, Optional, Sequence from uuid import UUID, uuid4 from pydantic import BaseModel, Field, RootModel, SerializeAsAny -from rich.panel import Panel -from rich.syntax import Syntax from typing_extensions import Self, TypeAliasType if TYPE_CHECKING: @@ -79,14 +77,14 @@ class SpanStatus(Enum): class Context(BaseModel): - trace_id: str - span_id: str + trace_id: UUID + span_id: UUID class ExportedSpan(BaseModel): context: Context name: str | None - parent_id: str | None + parent_id: UUID | None start_time: datetime end_time: datetime attributes: SpanAttributes @@ -95,8 +93,7 @@ class ExportedSpan(BaseModel): # we ignore the links concept -class ExportedSpanList(RootModel): - root: List[ExportedSpan] +ExportedSpanList = RootModel[Sequence[ExportedSpan]] class Tracer(ABC): @@ -117,7 +114,7 @@ def span( self, name: str, timestamp: Optional[datetime] = None, - ) -> "Span": # TODO + ) -> "Span": """Generate a span from the current span or logging instance. Allows for grouping multiple logs and duration together as a single, logical step in the @@ -129,8 +126,7 @@ def span( Args: name: A descriptive name of what this span will contain logs about. - timestamp: optional override of the starting timestamp. Otherwise should default to now. - trace_id: optional override of a trace id. Otherwise it creates a new default id. + timestamp: Override of the starting timestamp. Defaults to call time. Returns: An instance of a Span. @@ -143,7 +139,7 @@ def task_span( task_name: str, input: PydanticSerializable, timestamp: Optional[datetime] = None, - ) -> "TaskSpan": # TODO + ) -> "TaskSpan": """Generate a task-specific span from the current span or logging instance. Allows for grouping multiple logs together, as well as the task's specific input, output, @@ -155,32 +151,19 @@ def task_span( Args: task_name: The name of the task that is being logged input: The input for the task that is being logged. - timestamp: optional override of the starting timestamp. Otherwise should default to now. - trace_id: optional override of a trace id. Otherwise it creates a new default id. - + timestamp: Override of the starting timestamp. Defaults to call time. Returns: An instance of a TaskSpan. """ ... - def ensure_id(self, id: Optional[str]) -> str: - """Returns a valid id for tracing. - - Args: - id: current id to use if present. - Returns: - `id` if present, otherwise a new unique ID. - """ - - return id if id is not None else str(uuid4()) - @abstractmethod def export_for_viewing(self) -> Sequence[ExportedSpan]: """Converts the trace to a format that can be read by the trace viewer. - The format is inspired by the OpenTelemetry Format, but does not abide by it, - because it is too complex for our use-case. + The format is inspired by the OpenTelemetry Format, but does not abide by it. + Specifically, it cuts away unused concepts, such as links. Returns: A list of spans which includes the current span and all its child spans. @@ -199,13 +182,25 @@ class Span(Tracer, AbstractContextManager["Span"]): Logs and other spans can be nested underneath. - Can also be used as a Context Manager to easily capture the start and end time, and keep the - span only in scope while it is active. + Can also be used as a context manager to easily capture the start and end time, and keep the + span only in scope while it is open. + + Attributes: + context: The context of the current span. If the span is a root span, the trace id will be equal to its span id. + status_code: Status of the span. Will be "OK" unless the span was interrupted by an exception. """ + context: Context + def __init__(self, context: Optional[Context] = None): - # super().__init__() - span_id = str(uuid4()) + """Creates a span from the context of its parent. + + Initializes the spans `context` based on the parent context and its `status_code`. + + Args: + context: Context of the parent. Defaults to None. + """ + span_id = uuid4() if context is None: trace_id = span_id else: @@ -244,26 +239,16 @@ def log( @abstractmethod def end(self, timestamp: Optional[datetime] = None) -> None: - """Marks the Span as done, with the end time of the span. The Span should be regarded + """Marks the Span as closed, with the end time of the span. The Span should be regarded as complete, and no further logging should happen with it. Ending a closed span in undefined behavior. Args: - timestamp: Optional override of the timestamp, otherwise should be set to now. + timestamp: Optional override of the timestamp. Defaults to call time. """ self._closed = True - def ensure_id(self, id: str | None) -> str: - """Returns a valid id for tracing. - - Args: - id: current id to use if present. - Returns: - `id` if present, otherwise id of this `Span` - """ - return id if id is not None else self.id() - def __exit__( self, exc_type: Optional[type[BaseException]], @@ -296,9 +281,6 @@ def record_output(self, output: PydanticSerializable) -> None: """Record :class:`Task` output. Since a Context Manager can't provide this in the `__exit__` method, output should be captured once it is generated. - This should be handled automatically within the execution of the task, and it is - unlikely this would be called directly by you. - Args: output: The output of the task that is being logged. """ @@ -367,152 +349,3 @@ def export_for_viewing(self) -> Sequence[ExportedSpan]: class JsonSerializer(RootModel[PydanticSerializable]): root: SerializeAsAny[PydanticSerializable] - - -def _render_log_value(value: PydanticSerializable, title: str) -> Panel: - value = value if isinstance(value, BaseModel) else JsonSerializer(root=value) - return Panel( - Syntax( - value.model_dump_json(indent=2, exclude_defaults=True), - "json", - word_wrap=True, - ), - title=title, - ) - - -class LogEntry(BaseModel): - """An individual log entry, currently used to represent individual logs by the - `InMemoryTracer`. - - Attributes: - message: A description of the value you are logging, such as the step in the task this - is related to. - value: The relevant data you want to log. Can be anything that is serializable by - Pydantic, which gives the tracers flexibility in how they store and emit the logs. - timestamp: The time that the log was emitted. - id: The ID of the trace to which this log entry belongs. - """ - - message: str - value: SerializeAsAny[PydanticSerializable] - timestamp: datetime = Field(default_factory=datetime.utcnow) - trace_id: str - - def _rich_render_(self) -> Panel: - """Renders the trace via classes in the `rich` package""" - return _render_log_value(self.value, self.message) - - def _ipython_display_(self) -> None: - """Default rendering for Jupyter notebooks""" - from rich import print - - print(self._rich_render_()) - - -class StartTask(BaseModel): - """Represents the payload/entry of a log-line indicating that a `TaskSpan` was opened through `Tracer.task_span`. - - Attributes: - uuid: A unique id for the opened `TaskSpan`. - parent: The unique id of the parent element of opened `TaskSpan`. - This could refer to either a surrounding `TaskSpan`, `Span` or the top-level `Tracer`. - name: The name of the task. - start: The timestamp when this `Task` was started (i.e. `run` was called). - input: The `Input` (i.e. parameter for `run`) the `Task` was started with. - trace_id: The trace id of the opened `TaskSpan`. - - """ - - uuid: UUID - parent: UUID - name: str - start: datetime - input: SerializeAsAny[PydanticSerializable] - trace_id: str - - -class EndTask(BaseModel): - """Represents the payload/entry of a log-line that indicates that a `TaskSpan` ended (i.e. the context-manager exited). - - Attributes: - uuid: The uuid of the corresponding `StartTask`. - end: the timestamp when this `Task` completed (i.e. `run` returned). - output: the `Output` (i.e. return value of `run`) the `Task` returned. - """ - - uuid: UUID - end: datetime - output: SerializeAsAny[PydanticSerializable] - status_code: SpanStatus = SpanStatus.OK - - -class StartSpan(BaseModel): - """Represents the payload/entry of a log-line indicating that a `Span` was opened through `Tracer.span`. - - Attributes: - uuid: A unique id for the opened `Span`. - parent: The unique id of the parent element of opened `TaskSpan`. - This could refer to either a surrounding `TaskSpan`, `Span` or the top-level `Tracer`. - name: The name of the task. - start: The timestamp when this `Span` was started. - trace_id: The ID of the trace this span belongs to. - """ - - uuid: UUID - parent: UUID - name: str - start: datetime - trace_id: str - - -class EndSpan(BaseModel): - """Represents the payload/entry of a log-line that indicates that a `Span` ended. - - Attributes: - uuid: The uuid of the corresponding `StartSpan`. - end: the timestamp when this `Span` completed. - """ - - uuid: UUID - end: datetime - status_code: SpanStatus = SpanStatus.OK - - -class PlainEntry(BaseModel): - """Represents a plain log-entry created through `Tracer.log`. - - Attributes: - message: the message-parameter of `Tracer.log` - value: the value-parameter of `Tracer.log` - timestamp: the timestamp when `Tracer.log` was called. - parent: The unique id of the parent element of the log. - This could refer to either a surrounding `TaskSpan`, `Span` or the top-level `Tracer`. - trace_id: The ID of the trace this entry belongs to. - """ - - message: str - value: SerializeAsAny[PydanticSerializable] - timestamp: datetime - parent: UUID - trace_id: str - - -class LogLine(BaseModel): - """Represents a complete log-line. - - Attributes: - entry_type: The type of the entry. This is the class-name of one of the classes - representing a log-entry (e.g. "StartTask"). - entry: The actual entry. - - """ - - trace_id: str - entry_type: str - entry: SerializeAsAny[PydanticSerializable] - - -def _serialize(s: SerializeAsAny[PydanticSerializable]) -> str: - value = s if isinstance(s, BaseModel) else JsonSerializer(root=s) - return value.model_dump_json() diff --git a/src/intelligence_layer/evaluation/run/file_run_repository.py b/src/intelligence_layer/evaluation/run/file_run_repository.py index 454f42a04..0a201c4e2 100644 --- a/src/intelligence_layer/evaluation/run/file_run_repository.py +++ b/src/intelligence_layer/evaluation/run/file_run_repository.py @@ -121,7 +121,7 @@ def _example_trace_path(self, run_id: str, example_id: str) -> Path: @staticmethod def _parse_log(log_path: Path) -> InMemoryTracer: - return FileTracer(log_path).trace() + return FileTracer(log_path).traces() def _example_output_path(self, run_id: str, example_id: str) -> Path: return (self._run_output_directory(run_id) / example_id).with_suffix(".json") diff --git a/tests/core/test_task.py b/tests/core/test_task.py index bdc2fa4e8..429b69c84 100644 --- a/tests/core/test_task.py +++ b/tests/core/test_task.py @@ -4,7 +4,7 @@ from time import sleep from typing import Callable -from intelligence_layer.core import InMemorySpan, InMemoryTracer, NoOpTracer, TaskSpan +from intelligence_layer.core import InMemoryTracer, NoOpTracer, TaskSpan from intelligence_layer.core.task import MAX_CONCURRENCY, Task @@ -111,21 +111,3 @@ def test_sub_tasks_do_not_introduce_multiple_task_spans() -> None: assert isinstance(tracer.entries[0], TaskSpan) assert tracer.entries[0].entries assert not isinstance(tracer.entries[0].entries[0], TaskSpan) - - -def test_ids_are_set_in_concurrent_run() -> None: - tracer = InMemoryTracer() - task = DeadlockDetector() - - task.run_concurrently([None] * MAX_CONCURRENCY, tracer, trace_id="ID") - assert tracer.entries - assert tracer.entries[0].id() == "ID" - - -def test_ids_are_equal_for_multiple_subtasks() -> None: - tracer = InMemoryTracer() - NestedTask().run(None, tracer, "ID") - assert isinstance(tracer.entries[0], InMemorySpan) - assert tracer.entries[0].id() == "ID" - assert isinstance(tracer.entries[0].entries[0], InMemorySpan) - assert tracer.entries[0].entries[0].id() == "ID" diff --git a/tests/core/test_temp.py b/tests/core/test_temp.py deleted file mode 100644 index e72415d54..000000000 --- a/tests/core/test_temp.py +++ /dev/null @@ -1,57 +0,0 @@ -import contextlib -import json -import os -import time -from pathlib import Path -from typing import Any, Iterator, Optional -from unittest.mock import Mock - -import pytest -import requests -from aleph_alpha_client import Prompt -from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from pytest import fixture - -from intelligence_layer.core import ( - CompleteInput, - CompleteOutput, - CompositeTracer, - FileTracer, - InMemorySpan, - InMemoryTaskSpan, - InMemoryTracer, - LogEntry, - LuminousControlModel, - OpenTelemetryTracer, - Task, - TaskSpan, - utc_now, -) -from intelligence_layer.core.tracer.persistent_tracer import TracerLogEntryFailed -from intelligence_layer.core.tracer.tracer import ErrorValue - -@fixture -def open_telemetry_tracer() -> tuple[str, OpenTelemetryTracer]: - service_name = "test-service" - url = "http://localhost:16686/api/traces?service=" + service_name - resource = Resource.create({SERVICE_NAME: service_name}) - provider = TracerProvider(resource=resource) - trace.set_tracer_provider(provider) - processor = BatchSpanProcessor(OTLPSpanExporter()) - provider.add_span_processor(processor) - openTracer = OpenTelemetryTracer(trace.get_tracer("intelligence-layer")) - return (url, openTracer) - - -def test_temp_1(open_telemetry_tracer): - print("test_1") - -def test_temp_2(open_telemetry_tracer): - print("test_2") - -def test_temp_3(open_telemetry_tracer): - print("test_3") \ No newline at end of file diff --git a/tests/core/tracer/test_file_tracer.py b/tests/core/tracer/test_file_tracer.py index ef820c9cd..c14ffba74 100644 --- a/tests/core/tracer/test_file_tracer.py +++ b/tests/core/tracer/test_file_tracer.py @@ -15,19 +15,6 @@ def file_tracer(tmp_path: Path) -> FileTracer: return FileTracer(tmp_path / "log.log") -def test_file_tracer_retrieves_correct_trace( - file_tracer: FileTracer, test_task: Task[str, str] -) -> None: - input = "input" - test_task.run(input, file_tracer) - expected_trace = file_tracer.trace() - test_task.run(input, file_tracer) - assert len(expected_trace.entries) == 1 - assert expected_trace.entries[0].context is not None - retrieved_trace = file_tracer.trace(expected_trace.entries[0].context.trace_id) - assert retrieved_trace.export_for_viewing() == expected_trace.export_for_viewing() - - def test_file_tracer_retrieves_all_file_traces( file_tracer: FileTracer, test_task: Task[str, str] ) -> None: @@ -35,8 +22,10 @@ def test_file_tracer_retrieves_all_file_traces( test_task.run(input, file_tracer) test_task.run(input, file_tracer) - traces = file_tracer.trace() + traces = file_tracer.traces() assert len(traces.entries) == 2 + assert isinstance(traces.entries[0], InMemoryTaskSpan) + assert isinstance(traces.entries[1], InMemoryTaskSpan) assert traces.entries[0].context.trace_id != traces.entries[1].context.trace_id @@ -66,7 +55,7 @@ def test_file_tracer_is_backwards_compatible() -> None: file_tracer = FileTracer( current_file_location.parent / "fixtures/old_file_trace_format.jsonl" ) - tracer = file_tracer.trace() + tracer = file_tracer.traces() assert len(tracer.entries) == 1 task_span = tracer.entries[0] diff --git a/tests/core/tracer/test_in_memory_tracer.py b/tests/core/tracer/test_in_memory_tracer.py index bc9c16695..cad71e985 100644 --- a/tests/core/tracer/test_in_memory_tracer.py +++ b/tests/core/tracer/test_in_memory_tracer.py @@ -20,13 +20,15 @@ def test_trace_id_exists_for_all_children_of_task_span() -> None: parent_span = tracer.task_span("child", "input") parent_span.span("child2") - assert isinstance(tracer.entries[0], InMemorySpan) + assert isinstance(tracer.entries[0], InMemoryTaskSpan) + assert isinstance(tracer.entries[0].entries[0], InMemorySpan) assert ( tracer.entries[0].entries[0].context.trace_id == tracer.entries[0].context.trace_id ) parent_span.task_span("child3", "input") + assert isinstance(tracer.entries[0].entries[1], InMemoryTaskSpan) assert ( tracer.entries[0].entries[1].context.trace_id == tracer.entries[0].context.trace_id @@ -39,12 +41,14 @@ def test_trace_id_exists_for_all_children_of_span() -> None: parent_span.span("child2") assert isinstance(tracer.entries[0], InMemorySpan) + assert isinstance(tracer.entries[0].entries[0], InMemorySpan) assert ( tracer.entries[0].entries[0].context.trace_id == tracer.entries[0].context.trace_id ) parent_span.task_span("child3", "input") + assert isinstance(tracer.entries[0].entries[1], InMemorySpan) assert ( tracer.entries[0].entries[1].context.trace_id == tracer.entries[0].context.trace_id diff --git a/tests/core/tracer/test_open_telemetry_tracer.py b/tests/core/tracer/test_open_telemetry_tracer.py index e1e1b136a..c326ec9b4 100644 --- a/tests/core/tracer/test_open_telemetry_tracer.py +++ b/tests/core/tracer/test_open_telemetry_tracer.py @@ -1,6 +1,6 @@ import json import time -from typing import Any +from typing import Any, Sequence from uuid import uuid4 import pytest @@ -17,13 +17,14 @@ from pytest import fixture from intelligence_layer.core import OpenTelemetryTracer, Task +from intelligence_layer.core.tracer.tracer import SpanType class DummyExporter(SpanExporter): def __init__(self) -> None: self.spans: list[ReadableSpan] = [] - def export(self, spans: trace.Sequence[ReadableSpan]) -> SpanExportResult: + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: self.spans.extend(spans) return SpanExportResult.SUCCESS @@ -91,26 +92,37 @@ def test_open_telemetry_tracer_sets_attributes_correctly( spans = exporter.spans assert len(spans) == 4 spans_sorted_by_start: list[ReadableSpan] = sorted( - spans, key=lambda span: span.start_time + spans, key=lambda span: span.start_time if span.start_time else 0 ) - + assert spans_sorted_by_start[0].attributes is not None assert spans_sorted_by_start[0].name == "TestTask" assert spans_sorted_by_start[0].attributes["input"] == '"test-input"' assert spans_sorted_by_start[0].attributes["output"] == '"output"' + assert spans_sorted_by_start[0].attributes["type"] == SpanType.TASK_SPAN.value + assert spans_sorted_by_start[0].status.is_ok + assert spans_sorted_by_start[1].attributes is not None assert spans_sorted_by_start[1].name == "span" assert "input" not in spans_sorted_by_start[1].attributes.keys() + assert spans_sorted_by_start[1].attributes["type"] == SpanType.SPAN.value + assert spans_sorted_by_start[1].status.is_ok + assert spans_sorted_by_start[2].attributes is not None assert spans_sorted_by_start[2].name == "TestSubTask" assert spans_sorted_by_start[2].attributes["input"] == "null" assert spans_sorted_by_start[2].attributes["output"] == "null" + assert spans_sorted_by_start[2].attributes["type"] == SpanType.TASK_SPAN.value + assert spans_sorted_by_start[2].status.is_ok + assert spans_sorted_by_start[3].attributes is not None assert spans_sorted_by_start[3].name == "TestSubTask" assert spans_sorted_by_start[3].attributes["input"] == "null" assert spans_sorted_by_start[3].attributes["output"] == "null" + assert spans_sorted_by_start[3].attributes["type"] == SpanType.TASK_SPAN.value + assert spans_sorted_by_start[3].status.is_ok spans_sorted_by_end: list[ReadableSpan] = sorted( - spans_sorted_by_start, key=lambda span: span.end_time + spans_sorted_by_start, key=lambda span: span.end_time if span.end_time else 0 ) assert spans_sorted_by_end[0] == spans_sorted_by_start[2] @@ -119,6 +131,20 @@ def test_open_telemetry_tracer_sets_attributes_correctly( assert spans_sorted_by_end[3] == spans_sorted_by_start[0] +def test_open_telemetry_tracer_logs_error_code_correctly( + test_opentelemetry_tracer: OpenTelemetryTracer, + exporter: DummyExporter, + test_task: Task[str, str], +) -> None: + with pytest.raises(ValueError): + with test_opentelemetry_tracer.span("failing task"): + raise ValueError("my bad, sorry") + + spans = exporter.spans + assert len(spans) == 1 + assert not spans[0].status.is_ok + + def has_span_with_input(trace: Any, input_value: str) -> bool: return any( tag["key"] == "input" and tag["value"] == f'"{input_value}"' @@ -143,7 +169,7 @@ def test_open_telemetry_tracer_works_with_jaeger( input_value = str(uuid4()) test_task.run(input_value, jaeger_compatible_tracer) # the processor needs time to submit the trace to jaeger - time.sleep(0.3) + time.sleep(1) res = get_current_traces(url) diff --git a/tests/core/tracer/test_tracer.py b/tests/core/tracer/test_tracer.py index 25d5b091c..fc0d804d6 100644 --- a/tests/core/tracer/test_tracer.py +++ b/tests/core/tracer/test_tracer.py @@ -18,7 +18,9 @@ class DummyObject(BaseModel): @fixture -def composite_tracer(in_memory_tracer: InMemoryTracer, file_tracer: FileTracer): +def composite_tracer( + in_memory_tracer: InMemoryTracer, file_tracer: FileTracer +) -> CompositeTracer[Tracer]: return CompositeTracer(tracers=[in_memory_tracer, file_tracer]) diff --git a/tests/evaluation/test_domain.py b/tests/evaluation/test_domain.py index e4db9d584..df2ef0edc 100644 --- a/tests/evaluation/test_domain.py +++ b/tests/evaluation/test_domain.py @@ -1,6 +1,7 @@ from pytest import raises -from intelligence_layer.core import InMemorySpan, InMemoryTaskSpan, LogEntry, utc_now +from intelligence_layer.core import utc_now +from intelligence_layer.core.tracer.in_memory_tracer import InMemoryTracer from intelligence_layer.evaluation import ( AggregationOverview, EvaluationFailed, @@ -14,22 +15,13 @@ def test_to_trace_entry() -> None: now = utc_now() - entry = _to_trace_entry( - InMemoryTaskSpan( - name="task", - input="input", - output="output", - start_timestamp=now, - end_timestamp=now, - entries=[ - LogEntry(message="message", value="value", trace_id="ID"), - InMemorySpan( - name="span", start_timestamp=now, end_timestamp=now, trace_id="ID" - ), - ], - trace_id="ID", - ) - ) + span = InMemoryTracer().task_span("task", timestamp=now, input="input") + span.span("span", now).end(now) + span.log(message="message", value="value", timestamp=now) + span.record_output("output") + span.end(now) + + entry = _to_trace_entry(span) assert entry == TaskSpanTrace( name="task", @@ -38,8 +30,8 @@ def test_to_trace_entry() -> None: start=now, end=now, traces=[ - LogTrace(message="message", value="value"), SpanTrace(name="span", traces=[], start=now, end=now), + LogTrace(message="message", value="value"), ], ) @@ -49,7 +41,10 @@ def test_deserialize_task_trace() -> None: name="task", start=utc_now(), end=utc_now(), - traces=[], + traces=[ + SpanTrace(name="span", traces=[], start=utc_now(), end=utc_now()), + LogTrace(message="message", value="value"), + ], input=[{"a": "b"}], output=["c"], )