diff --git a/Concepts.md b/Concepts.md index 005768d9e..bf1b077ca 100644 --- a/Concepts.md +++ b/Concepts.md @@ -31,7 +31,7 @@ class Task(ABC, Generic[Input, Output]): `Input` and `Output` are normal Python datatypes that can be serialized from and to JSON. For this the Intelligence Layer relies on [Pydantic](https://docs.pydantic.dev/). The types that can actually be used are defined in form -of the type-alias [`PydanticSerializable`](src/intelligence_layer/core/tracer.py#L44). +of the type-alias [`PydanticSerializable`](src/intelligence_layer/core/tracer/tracer.py#L44). The second parameter `task_span` is used for [tracing](#Trace) which is described below. diff --git a/src/intelligence_layer/core/__init__.py b/src/intelligence_layer/core/__init__.py index 42fd31c22..e7e7a5837 100644 --- a/src/intelligence_layer/core/__init__.py +++ b/src/intelligence_layer/core/__init__.py @@ -41,20 +41,20 @@ from .text_highlight import TextHighlight as TextHighlight from .text_highlight import TextHighlightInput as TextHighlightInput from .text_highlight import TextHighlightOutput as TextHighlightOutput -from .tracer import CompositeTracer as CompositeTracer -from .tracer import FileSpan as FileSpan -from .tracer import FileTaskSpan as FileTaskSpan -from .tracer import FileTracer as FileTracer -from .tracer import InMemorySpan as InMemorySpan -from .tracer import InMemoryTaskSpan as InMemoryTaskSpan -from .tracer import InMemoryTracer as InMemoryTracer -from .tracer import LogEntry as LogEntry -from .tracer import NoOpTracer as NoOpTracer -from .tracer import OpenTelemetryTracer as OpenTelemetryTracer -from .tracer import PydanticSerializable as PydanticSerializable -from .tracer import Span as Span -from .tracer import TaskSpan as TaskSpan -from .tracer import Tracer as Tracer -from .tracer import utc_now as utc_now +from .tracer.composite_tracer import CompositeTracer as CompositeTracer +from .tracer.in_memory_tracer import InMemorySpan as InMemorySpan +from .tracer.in_memory_tracer import InMemoryTaskSpan as InMemoryTaskSpan +from .tracer.in_memory_tracer import InMemoryTracer as InMemoryTracer +from .tracer.open_telemetry_tracer import OpenTelemetryTracer as OpenTelemetryTracer +from .tracer.tracer import FileSpan as FileSpan +from .tracer.tracer import FileTaskSpan as FileTaskSpan +from .tracer.tracer import FileTracer as FileTracer +from .tracer.tracer import LogEntry as LogEntry +from .tracer.tracer import NoOpTracer as NoOpTracer +from .tracer.tracer import PydanticSerializable as PydanticSerializable +from .tracer.tracer import Span as Span +from .tracer.tracer import TaskSpan as TaskSpan +from .tracer.tracer import Tracer as Tracer +from .tracer.tracer import utc_now as utc_now __all__ = [symbol for symbol in dir()] diff --git a/src/intelligence_layer/core/tracer/__init__.py b/src/intelligence_layer/core/tracer/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/intelligence_layer/core/tracer/composite_tracer.py b/src/intelligence_layer/core/tracer/composite_tracer.py new file mode 100644 index 000000000..0294a883e --- /dev/null +++ b/src/intelligence_layer/core/tracer/composite_tracer.py @@ -0,0 +1,108 @@ +from datetime import datetime +from typing import Generic, Optional, Sequence + +from intelligence_layer.core import ( + Chunk, + InMemoryTracer, + PydanticSerializable, + Span, + TaskSpan, + Tracer, + utc_now, +) +from intelligence_layer.core.tracer.tracer import SpanVar, TracerVar +from intelligence_layer.use_cases import ClassifyInput, PromptBasedClassify + + +class CompositeTracer(Tracer, Generic[TracerVar]): + """A :class:`Tracer` that allows for recording to multiple tracers simultaneously. + + Each log-entry and span will be forwarded to all subtracers. + + Args: + tracers: tracers that will be forwarded all subsequent log and span calls. + + Example: + >>> from intelligence_layer.core import InMemoryTracer, FileTracer, CompositeTracer, Chunk + >>> from intelligence_layer.use_cases import PromptBasedClassify, ClassifyInput + + >>> tracer_1 = InMemoryTracer() + >>> tracer_2 = InMemoryTracer() + >>> tracer = CompositeTracer([tracer_1, tracer_2]) + >>> task = PromptBasedClassify() + >>> response = task.run(ClassifyInput(chunk=Chunk("Cool"), labels=frozenset({"label", "other label"})), tracer) + """ + + def __init__(self, tracers: Sequence[TracerVar]) -> None: + assert len(tracers) > 0 + self.tracers = tracers + + def span( + self, + name: str, + timestamp: Optional[datetime] = None, + trace_id: Optional[str] = None, + ) -> "CompositeSpan[Span]": + timestamp = timestamp or utc_now() + trace_id = self.ensure_id(trace_id) + return CompositeSpan( + [tracer.span(name, timestamp, trace_id) for tracer in self.tracers] + ) + + def task_span( + self, + task_name: str, + input: PydanticSerializable, + timestamp: Optional[datetime] = None, + trace_id: Optional[str] = None, + ) -> "CompositeTaskSpan": + timestamp = timestamp or utc_now() + trace_id = self.ensure_id(trace_id) + return CompositeTaskSpan( + [ + tracer.task_span(task_name, input, timestamp, trace_id) + for tracer in self.tracers + ] + ) + + +class CompositeSpan(Generic[SpanVar], CompositeTracer[SpanVar], Span): + """A :class:`Span` that allows for recording to multiple spans simultaneously. + + Each log-entry and span will be forwarded to all subspans. + + Args: + tracers: spans that will be forwarded all subsequent log and span calls. + """ + + def id(self) -> str: + return self.tracers[0].id() + + def log( + self, + message: str, + value: PydanticSerializable, + timestamp: Optional[datetime] = None, + ) -> None: + timestamp = timestamp or utc_now() + for tracer in self.tracers: + tracer.log(message, value, timestamp) + + def end(self, timestamp: Optional[datetime] = None) -> None: + timestamp = timestamp or utc_now() + for tracer in self.tracers: + tracer.end(timestamp) + + +class CompositeTaskSpan(CompositeSpan[TaskSpan], TaskSpan): + """A :class:`TaskSpan` that allows for recording to multiple TaskSpans simultaneously. + + Each log-entry and span will be forwarded to all subspans. + + Args: + tracers: task spans that will be forwarded all subsequent log and span calls. + """ + + def record_output(self, output: PydanticSerializable) -> None: + for tracer in self.tracers: + tracer.record_output(output) diff --git a/src/intelligence_layer/core/tracer/in_memory_tracer.py b/src/intelligence_layer/core/tracer/in_memory_tracer.py new file mode 100644 index 000000000..63ab1d4a3 --- /dev/null +++ b/src/intelligence_layer/core/tracer/in_memory_tracer.py @@ -0,0 +1,134 @@ +from datetime import datetime +from typing import Optional, Union + +from pydantic import BaseModel, Field, SerializeAsAny +from rich.tree import Tree + +from intelligence_layer.core import ( + LogEntry, + PydanticSerializable, + Span, + TaskSpan, + Tracer, + utc_now, +) +from intelligence_layer.core.tracer.tracer import _render_log_value + + +class InMemoryTracer(BaseModel, Tracer): + """Collects log entries in a nested structure, and keeps them in memory. + + If desired, the structure is serializable with Pydantic, so you can write out the JSON + representation to a file, or return via an API, or something similar. + + Attributes: + name: A descriptive name of what the tracer contains log entries about. + entries: A sequential list of log entries and/or nested InMemoryTracers with their own + log entries. + """ + + entries: list[Union[LogEntry, "InMemoryTaskSpan", "InMemorySpan"]] = [] + + def span( + self, + name: str, + timestamp: Optional[datetime] = None, + trace_id: Optional[str] = None, + ) -> "InMemorySpan": + child = InMemorySpan( + name=name, + start_timestamp=timestamp or utc_now(), + trace_id=self.ensure_id(trace_id), + ) + self.entries.append(child) + return child + + def task_span( + self, + task_name: str, + input: PydanticSerializable, + timestamp: Optional[datetime] = None, + trace_id: Optional[str] = None, + ) -> "InMemoryTaskSpan": + child = InMemoryTaskSpan( + name=task_name, + input=input, + start_timestamp=timestamp or utc_now(), + trace_id=self.ensure_id(trace_id), + ) + self.entries.append(child) + return child + + def _rich_render_(self) -> Tree: + """Renders the trace via classes in the `rich` package""" + tree = Tree(label="Trace") + + for log in self.entries: + tree.add(log._rich_render_()) + + return tree + + def _ipython_display_(self) -> None: + """Default rendering for Jupyter notebooks""" + from rich import print + + print(self._rich_render_()) + + +class InMemorySpan(InMemoryTracer, Span): + name: str + start_timestamp: datetime = Field(default_factory=datetime.utcnow) + end_timestamp: Optional[datetime] = None + trace_id: str + + def id(self) -> str: + return self.trace_id + + def log( + self, + message: str, + value: PydanticSerializable, + timestamp: Optional[datetime] = None, + ) -> None: + self.entries.append( + LogEntry( + message=message, + value=value, + timestamp=timestamp or utc_now(), + trace_id=self.id(), + ) + ) + + def end(self, timestamp: Optional[datetime] = None) -> None: + if not self.end_timestamp: + self.end_timestamp = timestamp or utc_now() + + def _rich_render_(self) -> Tree: + """Renders the trace via classes in the `rich` package""" + tree = Tree(label=self.name) + + for log in self.entries: + tree.add(log._rich_render_()) + + return tree + + +class InMemoryTaskSpan(InMemorySpan, TaskSpan): + input: SerializeAsAny[PydanticSerializable] + output: Optional[SerializeAsAny[PydanticSerializable]] = None + + def record_output(self, output: PydanticSerializable) -> None: + self.output = output + + def _rich_render_(self) -> Tree: + """Renders the trace via classes in the `rich` package""" + tree = Tree(label=self.name) + + tree.add(_render_log_value(self.input, "Input")) + + for log in self.entries: + tree.add(log._rich_render_()) + + tree.add(_render_log_value(self.output, "Output")) + + return tree diff --git a/src/intelligence_layer/core/tracer/open_telemetry_tracer.py b/src/intelligence_layer/core/tracer/open_telemetry_tracer.py new file mode 100644 index 000000000..82da828c5 --- /dev/null +++ b/src/intelligence_layer/core/tracer/open_telemetry_tracer.py @@ -0,0 +1,109 @@ +from datetime import datetime +from typing import Optional + +from opentelemetry.context import attach, detach +from opentelemetry.trace import Span as OpenTSpan +from opentelemetry.trace import Tracer as OpenTTracer +from opentelemetry.trace import set_span_in_context + +from intelligence_layer.core.tracer.tracer import ( + PydanticSerializable, + Span, + TaskSpan, + Tracer, + _serialize, +) + + +class OpenTelemetryTracer(Tracer): + """A `Tracer` that uses open telemetry.""" + + def __init__(self, tracer: OpenTTracer) -> None: + self._tracer = tracer + + def span( + self, + name: str, + timestamp: Optional[datetime] = None, + trace_id: Optional[str] = None, + ) -> "OpenTelemetrySpan": + trace_id = self.ensure_id(trace_id) + tracer_span = self._tracer.start_span( + name, + attributes={"trace_id": trace_id}, + start_time=None if not timestamp else _open_telemetry_timestamp(timestamp), + ) + token = attach(set_span_in_context(tracer_span)) + return OpenTelemetrySpan(tracer_span, self._tracer, token, trace_id) + + def task_span( + self, + task_name: str, + input: PydanticSerializable, + timestamp: Optional[datetime] = None, + trace_id: Optional[str] = None, + ) -> "OpenTelemetryTaskSpan": + trace_id = self.ensure_id(trace_id) + + tracer_span = self._tracer.start_span( + task_name, + attributes={"input": _serialize(input), "trace_id": trace_id}, + start_time=None if not timestamp else _open_telemetry_timestamp(timestamp), + ) + token = attach(set_span_in_context(tracer_span)) + return OpenTelemetryTaskSpan(tracer_span, self._tracer, token, trace_id) + + +class OpenTelemetrySpan(Span, OpenTelemetryTracer): + """A `Span` created by `OpenTelemetryTracer.span`.""" + + end_timestamp: Optional[datetime] = None + + def id(self) -> str: + return self._trace_id + + def __init__( + self, span: OpenTSpan, tracer: OpenTTracer, token: object, trace_id: str + ) -> None: + super().__init__(tracer) + self.open_ts_span = span + self._token = token + self._trace_id = trace_id + + def log( + self, + message: str, + value: PydanticSerializable, + timestamp: Optional[datetime] = None, + ) -> None: + self.open_ts_span.add_event( + message, + {"value": _serialize(value), "trace_id": self.id()}, + None if not timestamp else _open_telemetry_timestamp(timestamp), + ) + + def end(self, timestamp: Optional[datetime] = None) -> None: + detach(self._token) + self.open_ts_span.end( + _open_telemetry_timestamp(timestamp) if timestamp is not None else None + ) + + +class OpenTelemetryTaskSpan(TaskSpan, OpenTelemetrySpan): + """A `TaskSpan` created by `OpenTelemetryTracer.task_span`.""" + + output: Optional[PydanticSerializable] = None + + def __init__( + self, span: OpenTSpan, tracer: OpenTTracer, token: object, trace_id: str + ) -> None: + super().__init__(span, tracer, token, trace_id) + + def record_output(self, output: PydanticSerializable) -> None: + self.open_ts_span.set_attribute("output", _serialize(output)) + + +def _open_telemetry_timestamp(t: datetime) -> int: + # Open telemetry expects *nanoseconds* since epoch + t_float = t.timestamp() * 1e9 + return int(t_float) diff --git a/src/intelligence_layer/core/tracer/persistent_tracer.py b/src/intelligence_layer/core/tracer/persistent_tracer.py new file mode 100644 index 000000000..ce7587650 --- /dev/null +++ b/src/intelligence_layer/core/tracer/persistent_tracer.py @@ -0,0 +1,129 @@ +from abc import ABC, abstractmethod +from datetime import datetime +from typing import Iterable, Optional +from uuid import uuid4 + +from pydantic import BaseModel + +from intelligence_layer.core import ( + InMemoryTracer, + PydanticSerializable, + Span, + TaskSpan, + Tracer, + utc_now, +) +from intelligence_layer.core.tracer.tracer import ( + EndSpan, + EndTask, + LogLine, + PlainEntry, + StartSpan, + StartTask, + TreeBuilder, +) + + +class PersistentTracer(Tracer, ABC): + def __init__(self) -> None: + self.uuid = uuid4() + + @abstractmethod + def _log_entry(self, id: str, entry: BaseModel) -> None: + pass + + @abstractmethod + def trace(self, trace_id: str) -> InMemoryTracer: + pass + + def _log_span( + self, span: "PersistentSpan", name: str, timestamp: Optional[datetime] = None + ) -> None: + self._log_entry( + span.id(), + StartSpan( + uuid=span.uuid, + parent=self.uuid, + name=name, + start=timestamp or utc_now(), + trace_id=span.id(), + ), + ) + + def _log_task( + self, + task_span: "PersistentTaskSpan", + task_name: str, + input: PydanticSerializable, + timestamp: Optional[datetime] = None, + ) -> None: + self._log_entry( + task_span.id(), + StartTask( + uuid=task_span.uuid, + parent=self.uuid, + name=task_name, + start=timestamp or utc_now(), + input=input, + trace_id=task_span.id(), + ), + ) + + def _parse_log(self, log_entries: Iterable[LogLine]) -> InMemoryTracer: + tree_builder = TreeBuilder() + for log_line in log_entries: + if log_line.entry_type == StartTask.__name__: + tree_builder.start_task(log_line) + elif log_line.entry_type == EndTask.__name__: + tree_builder.end_task(log_line) + elif log_line.entry_type == StartSpan.__name__: + tree_builder.start_span(log_line) + elif log_line.entry_type == EndSpan.__name__: + tree_builder.end_span(log_line) + elif log_line.entry_type == PlainEntry.__name__: + tree_builder.plain_entry(log_line) + else: + raise RuntimeError(f"Unexpected entry_type in {log_line}") + assert tree_builder.root + return tree_builder.root + + +class PersistentSpan(Span, PersistentTracer): + end_timestamp: Optional[datetime] = None + + def log( + self, + message: str, + value: PydanticSerializable, + timestamp: Optional[datetime] = None, + ) -> None: + self._log_entry( + self.id(), + PlainEntry( + message=message, + value=value, + timestamp=timestamp or utc_now(), + parent=self.uuid, + trace_id=self.id(), + ), + ) + + def end(self, timestamp: Optional[datetime] = None) -> None: + if not self.end_timestamp: + self.end_timestamp = timestamp or utc_now() + self._log_entry(self.id(), EndSpan(uuid=self.uuid, end=self.end_timestamp)) + + +class PersistentTaskSpan(TaskSpan, PersistentSpan): + output: Optional[PydanticSerializable] = None + + def record_output(self, output: PydanticSerializable) -> None: + self.output = output + + def end(self, timestamp: Optional[datetime] = None) -> None: + if not self.end_timestamp: + self.end_timestamp = timestamp or utc_now() + self._log_entry( + self.id(), + EndTask(uuid=self.uuid, end=self.end_timestamp, output=self.output), + ) diff --git a/src/intelligence_layer/core/tracer.py b/src/intelligence_layer/core/tracer/tracer.py similarity index 57% rename from src/intelligence_layer/core/tracer.py rename to src/intelligence_layer/core/tracer/tracer.py index 78668324a..e590aa1d5 100644 --- a/src/intelligence_layer/core/tracer.py +++ b/src/intelligence_layer/core/tracer/tracer.py @@ -4,28 +4,21 @@ from json import loads from pathlib import Path from types import TracebackType -from typing import ( - TYPE_CHECKING, - Generic, - Iterable, - Mapping, - Optional, - Sequence, - TypeVar, - Union, -) +from typing import TYPE_CHECKING, Mapping, Optional, Sequence, TypeVar from uuid import UUID, uuid4 -from opentelemetry.context import attach, detach -from opentelemetry.trace import Span as OpenTSpan -from opentelemetry.trace import Tracer as OpenTTracer -from opentelemetry.trace import set_span_in_context from pydantic import BaseModel, Field, RootModel, SerializeAsAny from rich.panel import Panel from rich.syntax import Syntax -from rich.tree import Tree from typing_extensions import Self, TypeAliasType +from intelligence_layer.core import InMemorySpan, InMemoryTaskSpan, InMemoryTracer +from intelligence_layer.core.tracer.persistent_tracer import ( + PersistentSpan, + PersistentTaskSpan, + PersistentTracer, +) + if TYPE_CHECKING: PydanticSerializable = ( int @@ -233,104 +226,9 @@ def record_output(self, output: PydanticSerializable) -> None: TracerVar = TypeVar("TracerVar", bound=Tracer) - -class CompositeTracer(Tracer, Generic[TracerVar]): - """A :class:`Tracer` that allows for recording to multiple tracers simultaneously. - - Each log-entry and span will be forwarded to all subtracers. - - Args: - tracers: tracers that will be forwarded all subsequent log and span calls. - - Example: - >>> from intelligence_layer.core import InMemoryTracer, FileTracer, CompositeTracer, Chunk - >>> from intelligence_layer.use_cases import PromptBasedClassify, ClassifyInput - - >>> tracer_1 = InMemoryTracer() - >>> tracer_2 = InMemoryTracer() - >>> tracer = CompositeTracer([tracer_1, tracer_2]) - >>> task = PromptBasedClassify() - >>> response = task.run(ClassifyInput(chunk=Chunk("Cool"), labels=frozenset({"label", "other label"})), tracer) - """ - - def __init__(self, tracers: Sequence[TracerVar]) -> None: - assert len(tracers) > 0 - self.tracers = tracers - - def span( - self, - name: str, - timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, - ) -> "CompositeSpan[Span]": - timestamp = timestamp or utc_now() - trace_id = self.ensure_id(trace_id) - return CompositeSpan( - [tracer.span(name, timestamp, trace_id) for tracer in self.tracers] - ) - - def task_span( - self, - task_name: str, - input: PydanticSerializable, - timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, - ) -> "CompositeTaskSpan": - timestamp = timestamp or utc_now() - trace_id = self.ensure_id(trace_id) - return CompositeTaskSpan( - [ - tracer.task_span(task_name, input, timestamp, trace_id) - for tracer in self.tracers - ] - ) - - SpanVar = TypeVar("SpanVar", bound=Span) -class CompositeSpan(Generic[SpanVar], CompositeTracer[SpanVar], Span): - """A :class:`Span` that allows for recording to multiple spans simultaneously. - - Each log-entry and span will be forwarded to all subspans. - - Args: - tracers: spans that will be forwarded all subsequent log and span calls. - """ - - def id(self) -> str: - return self.tracers[0].id() - - def log( - self, - message: str, - value: PydanticSerializable, - timestamp: Optional[datetime] = None, - ) -> None: - timestamp = timestamp or utc_now() - for tracer in self.tracers: - tracer.log(message, value, timestamp) - - def end(self, timestamp: Optional[datetime] = None) -> None: - timestamp = timestamp or utc_now() - for tracer in self.tracers: - tracer.end(timestamp) - - -class CompositeTaskSpan(CompositeSpan[TaskSpan], TaskSpan): - """A :class:`TaskSpan` that allows for recording to multiple TaskSpans simultaneously. - - Each log-entry and span will be forwarded to all subspans. - - Args: - tracers: task spans that will be forwarded all subsequent log and span calls. - """ - - def record_output(self, output: PydanticSerializable) -> None: - for tracer in self.tracers: - tracer.record_output(output) - - class NoOpTracer(TaskSpan): """A no-op tracer. @@ -423,125 +321,6 @@ def _ipython_display_(self) -> None: print(self._rich_render_()) -class InMemoryTracer(BaseModel, Tracer): - """Collects log entries in a nested structure, and keeps them in memory. - - If desired, the structure is serializable with Pydantic, so you can write out the JSON - representation to a file, or return via an API, or something similar. - - Attributes: - name: A descriptive name of what the tracer contains log entries about. - entries: A sequential list of log entries and/or nested InMemoryTracers with their own - log entries. - """ - - entries: list[Union[LogEntry, "InMemoryTaskSpan", "InMemorySpan"]] = [] - - def span( - self, - name: str, - timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, - ) -> "InMemorySpan": - child = InMemorySpan( - name=name, - start_timestamp=timestamp or utc_now(), - trace_id=self.ensure_id(trace_id), - ) - self.entries.append(child) - return child - - def task_span( - self, - task_name: str, - input: PydanticSerializable, - timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, - ) -> "InMemoryTaskSpan": - child = InMemoryTaskSpan( - name=task_name, - input=input, - start_timestamp=timestamp or utc_now(), - trace_id=self.ensure_id(trace_id), - ) - self.entries.append(child) - return child - - def _rich_render_(self) -> Tree: - """Renders the trace via classes in the `rich` package""" - tree = Tree(label="Trace") - - for log in self.entries: - tree.add(log._rich_render_()) - - return tree - - def _ipython_display_(self) -> None: - """Default rendering for Jupyter notebooks""" - from rich import print - - print(self._rich_render_()) - - -class InMemorySpan(InMemoryTracer, Span): - name: str - start_timestamp: datetime = Field(default_factory=datetime.utcnow) - end_timestamp: Optional[datetime] = None - trace_id: str - - def id(self) -> str: - return self.trace_id - - def log( - self, - message: str, - value: PydanticSerializable, - timestamp: Optional[datetime] = None, - ) -> None: - self.entries.append( - LogEntry( - message=message, - value=value, - timestamp=timestamp or utc_now(), - trace_id=self.id(), - ) - ) - - def end(self, timestamp: Optional[datetime] = None) -> None: - if not self.end_timestamp: - self.end_timestamp = timestamp or utc_now() - - def _rich_render_(self) -> Tree: - """Renders the trace via classes in the `rich` package""" - tree = Tree(label=self.name) - - for log in self.entries: - tree.add(log._rich_render_()) - - return tree - - -class InMemoryTaskSpan(InMemorySpan, TaskSpan): - input: SerializeAsAny[PydanticSerializable] - output: Optional[SerializeAsAny[PydanticSerializable]] = None - - def record_output(self, output: PydanticSerializable) -> None: - self.output = output - - def _rich_render_(self) -> Tree: - """Renders the trace via classes in the `rich` package""" - tree = Tree(label=self.name) - - tree.add(_render_log_value(self.input, "Input")) - - for log in self.entries: - tree.add(log._rich_render_()) - - tree.add(_render_log_value(self.output, "Output")) - - return tree - - # Required for sphinx, see also: https://docs.pydantic.dev/2.4/errors/usage_errors/#class-not-fully-defined InMemorySpan.model_rebuild() InMemoryTracer.model_rebuild() @@ -702,111 +481,6 @@ def plain_entry(self, log_line: LogLine) -> None: TaskSpanVar = TypeVar("TaskSpanVar", bound=TaskSpan) -class PersistentTracer(Tracer, ABC): - def __init__(self) -> None: - self.uuid = uuid4() - - @abstractmethod - def _log_entry(self, id: str, entry: BaseModel) -> None: - pass - - @abstractmethod - def trace(self, trace_id: str) -> InMemoryTracer: - pass - - def _log_span( - self, span: "PersistentSpan", name: str, timestamp: Optional[datetime] = None - ) -> None: - self._log_entry( - span.id(), - StartSpan( - uuid=span.uuid, - parent=self.uuid, - name=name, - start=timestamp or utc_now(), - trace_id=span.id(), - ), - ) - - def _log_task( - self, - task_span: "PersistentTaskSpan", - task_name: str, - input: PydanticSerializable, - timestamp: Optional[datetime] = None, - ) -> None: - self._log_entry( - task_span.id(), - StartTask( - uuid=task_span.uuid, - parent=self.uuid, - name=task_name, - start=timestamp or utc_now(), - input=input, - trace_id=task_span.id(), - ), - ) - - def _parse_log(self, log_entries: Iterable[LogLine]) -> InMemoryTracer: - tree_builder = TreeBuilder() - for log_line in log_entries: - if log_line.entry_type == StartTask.__name__: - tree_builder.start_task(log_line) - elif log_line.entry_type == EndTask.__name__: - tree_builder.end_task(log_line) - elif log_line.entry_type == StartSpan.__name__: - tree_builder.start_span(log_line) - elif log_line.entry_type == EndSpan.__name__: - tree_builder.end_span(log_line) - elif log_line.entry_type == PlainEntry.__name__: - tree_builder.plain_entry(log_line) - else: - raise RuntimeError(f"Unexpected entry_type in {log_line}") - assert tree_builder.root - return tree_builder.root - - -class PersistentSpan(Span, PersistentTracer): - end_timestamp: Optional[datetime] = None - - def log( - self, - message: str, - value: PydanticSerializable, - timestamp: Optional[datetime] = None, - ) -> None: - self._log_entry( - self.id(), - PlainEntry( - message=message, - value=value, - timestamp=timestamp or utc_now(), - parent=self.uuid, - trace_id=self.id(), - ), - ) - - def end(self, timestamp: Optional[datetime] = None) -> None: - if not self.end_timestamp: - self.end_timestamp = timestamp or utc_now() - self._log_entry(self.id(), EndSpan(uuid=self.uuid, end=self.end_timestamp)) - - -class PersistentTaskSpan(TaskSpan, PersistentSpan): - output: Optional[PydanticSerializable] = None - - def record_output(self, output: PydanticSerializable) -> None: - self.output = output - - def end(self, timestamp: Optional[datetime] = None) -> None: - if not self.end_timestamp: - self.end_timestamp = timestamp or utc_now() - self._log_entry( - self.id(), - EndTask(uuid=self.uuid, end=self.end_timestamp, output=self.output), - ) - - class FileTracer(PersistentTracer): """A `Tracer` that logs to a file. @@ -896,97 +570,3 @@ def __init__( def _serialize(s: SerializeAsAny[PydanticSerializable]) -> str: value = s if isinstance(s, BaseModel) else JsonSerializer(root=s) return value.model_dump_json() - - -def _open_telemetry_timestamp(t: datetime) -> int: - # Open telemetry expects *nanoseconds* since epoch - t_float = t.timestamp() * 1e9 - return int(t_float) - - -class OpenTelemetryTracer(Tracer): - """A `Tracer` that uses open telemetry.""" - - def __init__(self, tracer: OpenTTracer) -> None: - self._tracer = tracer - - def span( - self, - name: str, - timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, - ) -> "OpenTelemetrySpan": - trace_id = self.ensure_id(trace_id) - tracer_span = self._tracer.start_span( - name, - attributes={"trace_id": trace_id}, - start_time=None if not timestamp else _open_telemetry_timestamp(timestamp), - ) - token = attach(set_span_in_context(tracer_span)) - return OpenTelemetrySpan(tracer_span, self._tracer, token, trace_id) - - def task_span( - self, - task_name: str, - input: PydanticSerializable, - timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, - ) -> "OpenTelemetryTaskSpan": - trace_id = self.ensure_id(trace_id) - - tracer_span = self._tracer.start_span( - task_name, - attributes={"input": _serialize(input), "trace_id": trace_id}, - start_time=None if not timestamp else _open_telemetry_timestamp(timestamp), - ) - token = attach(set_span_in_context(tracer_span)) - return OpenTelemetryTaskSpan(tracer_span, self._tracer, token, trace_id) - - -class OpenTelemetrySpan(Span, OpenTelemetryTracer): - """A `Span` created by `OpenTelemetryTracer.span`.""" - - end_timestamp: Optional[datetime] = None - - def id(self) -> str: - return self._trace_id - - def __init__( - self, span: OpenTSpan, tracer: OpenTTracer, token: object, trace_id: str - ) -> None: - super().__init__(tracer) - self.open_ts_span = span - self._token = token - self._trace_id = trace_id - - def log( - self, - message: str, - value: PydanticSerializable, - timestamp: Optional[datetime] = None, - ) -> None: - self.open_ts_span.add_event( - message, - {"value": _serialize(value), "trace_id": self.id()}, - None if not timestamp else _open_telemetry_timestamp(timestamp), - ) - - def end(self, timestamp: Optional[datetime] = None) -> None: - detach(self._token) - self.open_ts_span.end( - _open_telemetry_timestamp(timestamp) if timestamp is not None else None - ) - - -class OpenTelemetryTaskSpan(TaskSpan, OpenTelemetrySpan): - """A `TaskSpan` created by `OpenTelemetryTracer.task_span`.""" - - output: Optional[PydanticSerializable] = None - - def __init__( - self, span: OpenTSpan, tracer: OpenTTracer, token: object, trace_id: str - ) -> None: - super().__init__(span, tracer, token, trace_id) - - def record_output(self, output: PydanticSerializable) -> None: - self.open_ts_span.set_attribute("output", _serialize(output)) diff --git a/src/intelligence_layer/use_cases/qa/multiple_chunk_qa.py b/src/intelligence_layer/use_cases/qa/multiple_chunk_qa.py index e4de24409..d2726eae6 100644 --- a/src/intelligence_layer/use_cases/qa/multiple_chunk_qa.py +++ b/src/intelligence_layer/use_cases/qa/multiple_chunk_qa.py @@ -2,6 +2,7 @@ from pydantic import BaseModel +from intelligence_layer.core import Task, TaskSpan from intelligence_layer.core.chunk import Chunk from intelligence_layer.core.detect_language import Language, language_config from intelligence_layer.core.model import ( @@ -10,8 +11,6 @@ ControlModel, LuminousControlModel, ) -from intelligence_layer.core.task import Task -from intelligence_layer.core.tracer import TaskSpan from intelligence_layer.use_cases.qa.single_chunk_qa import ( SingleChunkQa, SingleChunkQaInput, @@ -128,8 +127,7 @@ class MultipleChunkQa(Task[MultipleChunkQaInput, MultipleChunkQaOutput]): >>> from intelligence_layer.connectors import ( ... LimitedConcurrencyClient, ... ) - >>> from intelligence_layer.core import Language - >>> from intelligence_layer.core import InMemoryTracer + >>> from intelligence_layer.core import Language, InMemoryTracer >>> from intelligence_layer.core.chunk import Chunk >>> from intelligence_layer.use_cases import ( ... MultipleChunkQa, diff --git a/src/intelligence_layer/use_cases/qa/single_chunk_qa.py b/src/intelligence_layer/use_cases/qa/single_chunk_qa.py index 7b7aeff88..d1b97bdef 100644 --- a/src/intelligence_layer/use_cases/qa/single_chunk_qa.py +++ b/src/intelligence_layer/use_cases/qa/single_chunk_qa.py @@ -3,6 +3,7 @@ from liquid import Template from pydantic import BaseModel +from intelligence_layer.core import Task, TaskSpan from intelligence_layer.core.chunk import Chunk from intelligence_layer.core.detect_language import Language, language_config from intelligence_layer.core.model import ( @@ -12,9 +13,7 @@ LuminousControlModel, ) from intelligence_layer.core.prompt_template import RichPrompt -from intelligence_layer.core.task import Task from intelligence_layer.core.text_highlight import TextHighlight, TextHighlightInput -from intelligence_layer.core.tracer import TaskSpan class QaSetup(BaseModel): @@ -89,8 +88,7 @@ class SingleChunkQa(Task[SingleChunkQaInput, SingleChunkQaOutput]): Example: >>> import os - >>> from intelligence_layer.core import Language - >>> from intelligence_layer.core import InMemoryTracer + >>> from intelligence_layer.core import Language, InMemoryTracer >>> from intelligence_layer.core import Chunk >>> from intelligence_layer.use_cases import SingleChunkQa, SingleChunkQaInput >>>