Skip to content

Commit

Permalink
Fix circular dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
MerlinKallenbornTNG committed Feb 28, 2024
1 parent 885cbdc commit 0003a2b
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 161 deletions.
7 changes: 2 additions & 5 deletions src/intelligence_layer/core/tracer/persistent_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,19 @@

from pydantic import BaseModel

from intelligence_layer.core import (
InMemoryTracer,
from intelligence_layer.core.tracer.in_memory_tracer import TreeBuilder, InMemoryTracer
from intelligence_layer.core.tracer.tracer import (
PydanticSerializable,
Span,
TaskSpan,
Tracer,
utc_now,
)
from intelligence_layer.core.tracer.tracer import (
EndSpan,
EndTask,
LogLine,
PlainEntry,
StartSpan,
StartTask,
TreeBuilder,
)


Expand Down
158 changes: 2 additions & 156 deletions src/intelligence_layer/core/tracer/tracer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from abc import ABC, abstractmethod
from contextlib import AbstractContextManager
from datetime import datetime, timezone
from json import loads
from pathlib import Path

from types import TracebackType
from typing import TYPE_CHECKING, Mapping, Optional, Sequence, TypeVar
from uuid import UUID, uuid4
Expand All @@ -12,12 +11,6 @@
from rich.syntax import Syntax
from typing_extensions import Self, TypeAliasType

from intelligence_layer.core import InMemorySpan, InMemoryTaskSpan, InMemoryTracer
from intelligence_layer.core.tracer.persistent_tracer import (
PersistentSpan,
PersistentTaskSpan,
PersistentTracer,
)

if TYPE_CHECKING:
PydanticSerializable = (
Expand Down Expand Up @@ -320,12 +313,6 @@ def _ipython_display_(self) -> None:

print(self._rich_render_())


# Required for sphinx, see also: https://docs.pydantic.dev/2.4/errors/usage_errors/#class-not-fully-defined
InMemorySpan.model_rebuild()
InMemoryTracer.model_rebuild()


class StartTask(BaseModel):
"""Represents the payload/entry of a log-line indicating that a `TaskSpan` was opened through `Tracer.task_span`.
Expand Down Expand Up @@ -413,7 +400,7 @@ class PlainEntry(BaseModel):


class LogLine(BaseModel):
"""Represents a a complete log-line.
"""Represents a complete log-line.
Attributes:
entry_type: The type of the entry. This is the class-name of one of the classes
Expand All @@ -426,147 +413,6 @@ class LogLine(BaseModel):
entry_type: str
entry: SerializeAsAny[PydanticSerializable]


class TreeBuilder(BaseModel):
root: InMemoryTracer = InMemoryTracer()
tracers: dict[UUID, InMemoryTracer] = Field(default_factory=dict)
tasks: dict[UUID, InMemoryTaskSpan] = Field(default_factory=dict)
spans: dict[UUID, InMemorySpan] = Field(default_factory=dict)

def start_task(self, log_line: LogLine) -> None:
start_task = StartTask.model_validate(log_line.entry)
child = InMemoryTaskSpan(
name=start_task.name,
input=start_task.input,
start_timestamp=start_task.start,
trace_id=start_task.trace_id,
)
self.tracers[start_task.uuid] = child
self.tasks[start_task.uuid] = child
self.tracers.get(start_task.parent, self.root).entries.append(child)

def end_task(self, log_line: LogLine) -> None:
end_task = EndTask.model_validate(log_line.entry)
task_span = self.tasks[end_task.uuid]
task_span.end_timestamp = end_task.end
task_span.record_output(end_task.output)

def start_span(self, log_line: LogLine) -> None:
start_span = StartSpan.model_validate(log_line.entry)
child = InMemorySpan(
name=start_span.name,
start_timestamp=start_span.start,
trace_id=start_span.trace_id,
)
self.tracers[start_span.uuid] = child
self.spans[start_span.uuid] = child
self.tracers.get(start_span.parent, self.root).entries.append(child)

def end_span(self, log_line: LogLine) -> None:
end_span = EndSpan.model_validate(log_line.entry)
span = self.spans[end_span.uuid]
span.end_timestamp = end_span.end

def plain_entry(self, log_line: LogLine) -> None:
plain_entry = PlainEntry.model_validate(log_line.entry)
entry = LogEntry(
message=plain_entry.message,
value=plain_entry.value,
timestamp=plain_entry.timestamp,
trace_id=plain_entry.trace_id,
)
self.tracers[plain_entry.parent].entries.append(entry)


TaskSpanVar = TypeVar("TaskSpanVar", bound=TaskSpan)


class FileTracer(PersistentTracer):
"""A `Tracer` that logs to a file.
Each log-entry is represented by a JSON object. The information logged allows
to reconstruct the hierarchical nature of the logs, i.e. all entries have a
_pointer_ to its parent element in form of a parent attribute containing
the uuid of the parent.
Args:
log_file_path: Denotes the file to log to.
Attributes:
uuid: a uuid for the tracer. If multiple :class:`FileTracer` instances log to the same file
the child-elements for a tracer can be identified by referring to this id as parent.
"""

def __init__(self, log_file_path: Path) -> None:
super().__init__()
self._log_file_path = log_file_path

def _log_entry(self, id: str, entry: BaseModel) -> None:
with self._log_file_path.open(mode="a", encoding="utf-8") as f:
f.write(
LogLine(
trace_id=id, entry_type=type(entry).__name__, entry=entry
).model_dump_json()
+ "\n"
)

def span(
self,
name: str,
timestamp: Optional[datetime] = None,
trace_id: Optional[str] = None,
) -> "FileSpan":
span = FileSpan(self._log_file_path, trace_id=self.ensure_id(trace_id))
self._log_span(span, name, timestamp)
return span

def task_span(
self,
task_name: str,
input: PydanticSerializable,
timestamp: Optional[datetime] = None,
trace_id: Optional[str] = None,
) -> "FileTaskSpan":
task = FileTaskSpan(
self._log_file_path,
trace_id=self.ensure_id(trace_id),
)
self._log_task(task, task_name, input, timestamp)
return task

def trace(self, trace_id: Optional[str] = None) -> InMemoryTracer:
with self._log_file_path.open("r") as f:
traces = (LogLine.model_validate(loads(line)) for line in f)
filtered_traces = (
(line for line in traces if line.trace_id == trace_id)
if trace_id is not None
else traces
)
return self._parse_log(filtered_traces)


class FileSpan(PersistentSpan, FileTracer):
"""A `Span` created by `FileTracer.span`."""

def id(self) -> str:
return self.trace_id

def __init__(self, log_file_path: Path, trace_id: str) -> None:
super().__init__(log_file_path)
self.trace_id = trace_id


class FileTaskSpan(PersistentTaskSpan, FileSpan):
"""A `TaskSpan` created by `FileTracer.task_span`."""

def __init__(
self,
log_file_path: Path,
trace_id: str,
) -> None:
super().__init__(log_file_path, trace_id)


def _serialize(s: SerializeAsAny[PydanticSerializable]) -> str:
value = s if isinstance(s, BaseModel) else JsonSerializer(root=s)
return value.model_dump_json()

0 comments on commit 0003a2b

Please sign in to comment.