Skip to content

Commit

Permalink
Documentation and test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
NiklasKoehneckeAA authored and FlorianSchepersAA committed May 23, 2024
1 parent 65e50dc commit 9e41a1b
Show file tree
Hide file tree
Showing 16 changed files with 354 additions and 423 deletions.
14 changes: 7 additions & 7 deletions src/intelligence_layer/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,22 @@
from .tracer.in_memory_tracer import InMemorySpan as InMemorySpan
from .tracer.in_memory_tracer import InMemoryTaskSpan as InMemoryTaskSpan
from .tracer.in_memory_tracer import InMemoryTracer as InMemoryTracer
from .tracer.in_memory_tracer import LogEntry as LogEntry
from .tracer.open_telemetry_tracer import OpenTelemetryTracer as OpenTelemetryTracer
from .tracer.persistent_tracer import EndSpan as EndSpan
from .tracer.persistent_tracer import EndTask as EndTask
from .tracer.persistent_tracer import LogLine as LogLine
from .tracer.persistent_tracer import PersistentSpan as PersistentSpan
from .tracer.persistent_tracer import PersistentTaskSpan as PersistentTaskSpan
from .tracer.persistent_tracer import PersistentTracer as PersistentTracer
from .tracer.persistent_tracer import PlainEntry as PlainEntry
from .tracer.persistent_tracer import StartSpan as StartSpan
from .tracer.persistent_tracer import StartTask as StartTask
from .tracer.persistent_tracer import TracerLogEntryFailed as TracerLogEntryFailed
from .tracer.tracer import EndSpan as EndSpan
from .tracer.tracer import EndTask as EndTask
from .tracer.tracer import JsonSerializer as JsonSerializer
from .tracer.tracer import LogEntry as LogEntry
from .tracer.tracer import LogLine as LogLine
from .tracer.tracer import NoOpTracer as NoOpTracer
from .tracer.tracer import PlainEntry as PlainEntry
from .tracer.tracer import PydanticSerializable as PydanticSerializable
from .tracer.tracer import Span as Span
from .tracer.tracer import StartSpan as StartSpan
from .tracer.tracer import StartTask as StartTask
from .tracer.tracer import TaskSpan as TaskSpan
from .tracer.tracer import Tracer as Tracer
from .tracer.tracer import utc_now as utc_now
Expand Down
2 changes: 0 additions & 2 deletions src/intelligence_layer/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def run(self, input: Input, tracer: Tracer) -> Output:
Args:
input: Generic input defined by the task implementation
tracer: The `Tracer` used for tracing.
trace_id: An optional id of the run, used to track its trace.
Returns:
Generic output defined by the task implementation.
"""
Expand All @@ -95,7 +94,6 @@ def run_concurrently(
concurrency_limit: An optional additional limit for the number of concurrently executed task for
this method call. This can be used to prevent queue-full or similar error of downstream APIs
when the global concurrency limit is too high for a certain task.
trace_id: An optional id of the run, used to track its trace.
Returns:
The Outputs generated by calling `run` for each given Input.
Expand Down
7 changes: 4 additions & 3 deletions src/intelligence_layer/core/tracer/composite_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
ExportedSpan,
PydanticSerializable,
Span,
SpanStatus,
TaskSpan,
Tracer,
utc_now,
Expand Down Expand Up @@ -96,7 +97,7 @@ def end(self, timestamp: Optional[datetime] = None) -> None:
tracer.end(timestamp)

@property
def status_code(self):
def status_code(self) -> SpanStatus:
status_codes = {tracer.status_code for tracer in self.tracers}
if len(status_codes) > 1:
raise ValueError(
Expand All @@ -105,9 +106,9 @@ def status_code(self):
return next(iter(status_codes))

@status_code.setter
def status_code(self, value):
def status_code(self, span_status: SpanStatus) -> None:
for tracer in self.tracers:
tracer.status_code = value
tracer.status_code = span_status


class CompositeTaskSpan(CompositeSpan[TaskSpan], TaskSpan):
Expand Down
10 changes: 6 additions & 4 deletions src/intelligence_layer/core/tracer/file_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@
from json import loads
from pathlib import Path
from typing import Optional
from uuid import UUID

from pydantic import BaseModel

from intelligence_layer.core.tracer.in_memory_tracer import InMemoryTracer
from intelligence_layer.core.tracer.persistent_tracer import (
LogLine,
PersistentSpan,
PersistentTaskSpan,
PersistentTracer,
)
from intelligence_layer.core.tracer.tracer import Context, LogLine, PydanticSerializable
from intelligence_layer.core.tracer.tracer import Context, PydanticSerializable


class FileTracer(PersistentTracer):
Expand All @@ -30,11 +32,11 @@ class FileTracer(PersistentTracer):
the child-elements for a tracer can be identified by referring to this id as parent.
"""

def __init__(self, log_file_path: Path | str, *args, **kwargs) -> None:
def __init__(self, log_file_path: Path | str) -> None:
super().__init__()
self._log_file_path = Path(log_file_path)

def _log_entry(self, id: str, entry: BaseModel) -> None:
def _log_entry(self, id: UUID, entry: BaseModel) -> None:
self._log_file_path.parent.mkdir(parents=True, exist_ok=True)
with self._log_file_path.open(mode="a", encoding="utf-8") as f:
f.write(
Expand Down Expand Up @@ -66,7 +68,7 @@ def task_span(
self._log_task(task, task_name, input, timestamp)
return task

def trace(self, trace_id: Optional[str] = None) -> InMemoryTracer:
def traces(self, trace_id: Optional[str] = None) -> InMemoryTracer:
with self._log_file_path.open("r") as f:
traces = (LogLine.model_validate(loads(line)) for line in f)
filtered_traces = (
Expand Down
123 changes: 45 additions & 78 deletions src/intelligence_layer/core/tracer/in_memory_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,24 @@

import requests
import rich
from pydantic import SerializeAsAny
from pydantic import BaseModel, Field, SerializeAsAny
from requests import HTTPError
from rich.panel import Panel
from rich.syntax import Syntax
from rich.tree import Tree

from intelligence_layer.core.tracer.tracer import (
Context,
EndSpan,
EndTask,
Event,
ExportedSpan,
ExportedSpanList,
LogEntry,
LogLine,
PlainEntry,
JsonSerializer,
PydanticSerializable,
Span,
SpanAttributes,
StartSpan,
StartTask,
TaskSpan,
TaskSpanAttributes,
Tracer,
_render_log_value,
utc_now,
)

Expand Down Expand Up @@ -176,6 +171,8 @@ def export_for_viewing(self) -> Sequence[ExportedSpan]:
raise RuntimeError(
"Span is not closed. A Span must be closed before it is exported for viewing."
)
assert self.end_timestamp is not None

logs: list[LogEntry] = []
exported_spans: list[ExportedSpan] = []
for entry in self.entries:
Expand Down Expand Up @@ -238,72 +235,42 @@ def _rich_render_(self) -> Tree:
return tree


class TreeBuilder:
def __init__(self) -> None:
self.root = InMemoryTracer()
self.tracers: dict[UUID, InMemoryTracer] = dict()
self.tasks: dict[UUID, InMemoryTaskSpan] = dict()
self.spans: dict[UUID, InMemorySpan] = dict()

def start_task(self, log_line: LogLine) -> None:
start_task = StartTask.model_validate(log_line.entry)
converted_span = InMemoryTaskSpan(
name=start_task.name,
input=start_task.input,
start_timestamp=start_task.start,
context=Context(
trace_id=start_task.trace_id, span_id=str(start_task.parent)
)
if start_task.trace_id != str(start_task.uuid)
else None,
)
# if root, also change the trace id
if converted_span.context.trace_id == converted_span.context.span_id:
converted_span.context.trace_id = str(start_task.uuid)
converted_span.context.span_id = str(start_task.uuid)
self.tracers.get(start_task.parent, self.root).entries.append(converted_span)
self.tracers[start_task.uuid] = converted_span
self.tasks[start_task.uuid] = converted_span

def end_task(self, log_line: LogLine) -> None:
end_task = EndTask.model_validate(log_line.entry)
task_span = self.tasks[end_task.uuid]
task_span.record_output(end_task.output)
task_span.status_code = end_task.status_code
task_span.end(end_task.end)

def start_span(self, log_line: LogLine) -> None:
start_span = StartSpan.model_validate(log_line.entry)
converted_span = InMemorySpan(
name=start_span.name,
start_timestamp=start_span.start,
context=Context(
trace_id=start_span.trace_id, span_id=str(start_span.parent)
)
if start_span.trace_id != str(start_span.uuid)
else None,
)
# if root, also change the trace id
if converted_span.context.trace_id == converted_span.context.span_id:
converted_span.context.trace_id = str(start_span.uuid)
converted_span.context.span_id = str(start_span.uuid)

self.tracers.get(start_span.parent, self.root).entries.append(converted_span)
self.tracers[start_span.uuid] = converted_span
self.spans[start_span.uuid] = converted_span

def end_span(self, log_line: LogLine) -> None:
end_span = EndSpan.model_validate(log_line.entry)
span = self.spans[end_span.uuid]
span.status_code = end_span.status_code
span.end(end_span.end)

def plain_entry(self, log_line: LogLine) -> None:
plain_entry = PlainEntry.model_validate(log_line.entry)
entry = LogEntry(
message=plain_entry.message,
value=plain_entry.value,
timestamp=plain_entry.timestamp,
trace_id=plain_entry.trace_id,
)
self.tracers[plain_entry.parent].entries.append(entry)
class LogEntry(BaseModel):
"""An individual log entry, currently used to represent individual logs by the
`InMemoryTracer`.
Attributes:
message: A description of the value you are logging, such as the step in the task this
is related to.
value: The relevant data you want to log. Can be anything that is serializable by
Pydantic, which gives the tracers flexibility in how they store and emit the logs.
timestamp: The time that the log was emitted.
id: The ID of the trace to which this log entry belongs.
"""

message: str
value: SerializeAsAny[PydanticSerializable]
timestamp: datetime = Field(default_factory=datetime.utcnow)
trace_id: UUID

def _rich_render_(self) -> Panel:
"""Renders the trace via classes in the `rich` package"""
return _render_log_value(self.value, self.message)

def _ipython_display_(self) -> None:
"""Default rendering for Jupyter notebooks"""
from rich import print

print(self._rich_render_())


def _render_log_value(value: PydanticSerializable, title: str) -> Panel:
value = value if isinstance(value, BaseModel) else JsonSerializer(root=value)
return Panel(
Syntax(
value.model_dump_json(indent=2, exclude_defaults=True),
"json",
word_wrap=True,
),
title=title,
)
17 changes: 15 additions & 2 deletions src/intelligence_layer/core/tracer/open_telemetry_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@

from opentelemetry.context import attach, detach
from opentelemetry.trace import Span as OpenTSpan
from opentelemetry.trace import StatusCode
from opentelemetry.trace import Tracer as OpenTTracer
from opentelemetry.trace import set_span_in_context
from pydantic import BaseModel, SerializeAsAny

from intelligence_layer.core.tracer.tracer import (
Context,
ExportedSpan,
JsonSerializer,
PydanticSerializable,
Span,
SpanStatus,
SpanType,
TaskSpan,
Tracer,
_serialize,
)


Expand All @@ -30,6 +34,7 @@ def span(
) -> "OpenTelemetrySpan":
tracer_span = self._tracer.start_span(
name,
attributes={"type": SpanType.SPAN.value},
start_time=None if not timestamp else _open_telemetry_timestamp(timestamp),
)
token = attach(set_span_in_context(tracer_span))
Expand All @@ -43,7 +48,7 @@ def task_span(
) -> "OpenTelemetryTaskSpan":
tracer_span = self._tracer.start_span(
task_name,
attributes={"input": _serialize(input)},
attributes={"input": _serialize(input), "type": SpanType.TASK_SPAN.value},
start_time=None if not timestamp else _open_telemetry_timestamp(timestamp),
)
token = attach(set_span_in_context(tracer_span))
Expand Down Expand Up @@ -86,6 +91,9 @@ def log(

def end(self, timestamp: Optional[datetime] = None) -> None:
super().end(timestamp)
self.open_ts_span.set_status(
StatusCode.OK if self.status_code == SpanStatus.OK else StatusCode.ERROR
)
detach(self._token)
self.open_ts_span.end(
_open_telemetry_timestamp(timestamp) if timestamp is not None else None
Expand All @@ -105,3 +113,8 @@ def _open_telemetry_timestamp(t: datetime) -> int:
# Open telemetry expects *nanoseconds* since epoch
t_float = t.timestamp() * 1e9
return int(t_float)


def _serialize(s: SerializeAsAny[PydanticSerializable]) -> str:
value = s if isinstance(s, BaseModel) else JsonSerializer(root=s)
return value.model_dump_json()
Loading

0 comments on commit 9e41a1b

Please sign in to comment.