Skip to content

Commit

Permalink
wip: file tracer
Browse files Browse the repository at this point in the history
  • Loading branch information
NiklasKoehneckeAA committed May 21, 2024
1 parent 875b1df commit 36c63b4
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 109 deletions.
8 changes: 5 additions & 3 deletions src/intelligence_layer/core/tracer/composite_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
TaskSpan,
Tracer,
utc_now,
Context
)

TracerVar = TypeVar("TracerVar", bound=Tracer)
Expand Down Expand Up @@ -37,6 +38,7 @@ class CompositeTracer(Tracer, Generic[TracerVar]):
def __init__(self, tracers: Sequence[TracerVar]) -> None:
assert len(tracers) > 0
self.tracers = tracers
self.context = tracers[0].context

def span(
self,
Expand Down Expand Up @@ -71,9 +73,9 @@ class CompositeSpan(Generic[SpanVar], CompositeTracer[SpanVar], Span):
Args:
tracers: spans that will be forwarded all subsequent log and span calls.
"""

def id(self) -> str:
return self.tracers[0].id()
def __init__(self, tracers: Sequence[SpanVar],context: Optional[Context] = None) -> None:
CompositeTracer.__init__(self, tracers)
Span.__init__(self, context=context)

def log(
self,
Expand Down
45 changes: 25 additions & 20 deletions src/intelligence_layer/core/tracer/file_tracer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime
from json import loads
from pathlib import Path
from typing import Optional
from typing import Optional, Sequence

from pydantic import BaseModel

Expand All @@ -12,7 +12,21 @@
PersistentTracer,
)
from intelligence_layer.core.tracer.tracer import LogLine, PydanticSerializable

from intelligence_layer.core.tracer.tracer import (
Context,
Event,
ExportedSpan,
ExportedSpanList,
LogEntry,
PydanticSerializable,
Span,
SpanAttributes,
TaskSpan,
TaskSpanAttributes,
Tracer,
_render_log_value,
utc_now,
)

class FileTracer(PersistentTracer):
"""A `Tracer` that logs to a file.
Expand All @@ -30,7 +44,7 @@ class FileTracer(PersistentTracer):
the child-elements for a tracer can be identified by referring to this id as parent.
"""

def __init__(self, log_file_path: Path | str) -> None:
def __init__(self, log_file_path: Path | str, *args, **kwargs) -> None:
super().__init__()
self._log_file_path = Path(log_file_path)

Expand All @@ -48,9 +62,8 @@ def span(
self,
name: str,
timestamp: Optional[datetime] = None,
trace_id: Optional[str] = None,
) -> "FileSpan":
span = FileSpan(self._log_file_path, trace_id=self.ensure_id(trace_id))
span = FileSpan(self._log_file_path, context=self.context)
self._log_span(span, name, timestamp)
return span

Expand All @@ -59,11 +72,10 @@ def task_span(
task_name: str,
input: PydanticSerializable,
timestamp: Optional[datetime] = None,
trace_id: Optional[str] = None,
) -> "FileTaskSpan":
task = FileTaskSpan(
self._log_file_path,
trace_id=self.ensure_id(trace_id),
context=self.context,
)
self._log_task(task, task_name, input, timestamp)
return task
Expand All @@ -79,23 +91,16 @@ def trace(self, trace_id: Optional[str] = None) -> InMemoryTracer:
return self._parse_log(filtered_traces)



class FileSpan(PersistentSpan, FileTracer):
"""A `Span` created by `FileTracer.span`."""

def id(self) -> str:
return self.trace_id

def __init__(self, log_file_path: Path, trace_id: str) -> None:
super().__init__(log_file_path)
self.trace_id = trace_id
def __init__(self, log_file_path: Path, context: Optional[Context] = None) -> None:
PersistentSpan.__init__(self, context=context)
FileTracer.__init__(self, log_file_path=log_file_path)



class FileTaskSpan(PersistentTaskSpan, FileSpan):
"""A `TaskSpan` created by `FileTracer.task_span`."""

def __init__(
self,
log_file_path: Path,
trace_id: str,
) -> None:
super().__init__(log_file_path, trace_id)
pass
128 changes: 74 additions & 54 deletions src/intelligence_layer/core/tracer/in_memory_tracer.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
import os
from datetime import datetime
from typing import Optional, Sequence, Union
from uuid import UUID

import requests
import rich
from pydantic import BaseModel, SerializeAsAny
from pydantic import BaseModel, Field, SerializeAsAny
from requests import HTTPError
from rich.tree import Tree

from intelligence_layer.core.tracer.tracer import (
Context,
EndSpan,
EndTask,
Event,
ExportedSpan,
ExportedSpanList,
LogEntry,
LogLine,
PlainEntry,
PydanticSerializable,
Span,
SpanAttributes,
StartSpan,
StartTask,
TaskSpan,
TaskSpanAttributes,
Tracer,
Expand All @@ -37,8 +44,7 @@ class InMemoryTracer(Tracer):
log entries.
"""

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
def __init__(self) -> None:
self.entries: list[Union[LogEntry, "InMemoryTaskSpan", "InMemorySpan"]] = []

def span(
Expand Down Expand Up @@ -124,7 +130,8 @@ def __init__(
context: Optional[Context] = None,
start_timestamp: Optional[datetime] = None,
) -> None:
super().__init__(context=context)
InMemoryTracer.__init__(self)
Span.__init__(self, context=context)
self.parent_id = None if context is None else context.span_id
self.name = name
self.start_timestamp = (
Expand Down Expand Up @@ -226,53 +233,66 @@ def _rich_render_(self) -> Tree:
return tree


class TreeBuilder(BaseModel):
pass
# root: InMemoryTracer = InMemoryTracer()
# tracers: dict[UUID, InMemoryTracer] = Field(default_factory=dict)
# tasks: dict[UUID, InMemoryTaskSpan] = Field(default_factory=dict)
# spans: dict[UUID, InMemorySpan] = Field(default_factory=dict)

# def start_task(self, log_line: LogLine) -> None:
# start_task = StartTask.model_validate(log_line.entry)
# child = InMemoryTaskSpan(
# name=start_task.name,
# input=start_task.input,
# start_timestamp=start_task.start,
# trace_id=start_task.trace_id,
# )
# self.tracers[start_task.uuid] = child
# self.tasks[start_task.uuid] = child
# self.tracers.get(start_task.parent, self.root).entries.append(child)

# def end_task(self, log_line: LogLine) -> None:
# end_task = EndTask.model_validate(log_line.entry)
# task_span = self.tasks[end_task.uuid]
# task_span.end_timestamp = end_task.end
# task_span.record_output(end_task.output)

# def start_span(self, log_line: LogLine) -> None:
# start_span = StartSpan.model_validate(log_line.entry)
# child = InMemorySpan(
# name=start_span.name,
# start_timestamp=start_span.start,
# trace_id=start_span.trace_id,
# )
# self.tracers[start_span.uuid] = child
# self.spans[start_span.uuid] = child
# self.tracers.get(start_span.parent, self.root).entries.append(child)

# def end_span(self, log_line: LogLine) -> None:
# end_span = EndSpan.model_validate(log_line.entry)
# span = self.spans[end_span.uuid]
# span.end_timestamp = end_span.end

# def plain_entry(self, log_line: LogLine) -> None:
# plain_entry = PlainEntry.model_validate(log_line.entry)
# entry = LogEntry(
# message=plain_entry.message,
# value=plain_entry.value,
# timestamp=plain_entry.timestamp,
# trace_id=plain_entry.trace_id,
# )
# self.tracers[plain_entry.parent].entries.append(entry)
class TreeBuilder:
def __init__(self) -> None:
self.root = InMemoryTracer()
self.tracers: dict[UUID, InMemoryTracer] = dict()
self.tasks: dict[UUID, InMemoryTaskSpan] = dict()
self.spans: dict[UUID, InMemorySpan] = dict()

def start_task(self, log_line: LogLine) -> None:
start_task = StartTask.model_validate(log_line.entry)
converted_span = InMemoryTaskSpan(
name=start_task.name,
input=start_task.input,
start_timestamp=start_task.start,
context=Context(
trace_id=start_task.trace_id, span_id=str(start_task.parent)
) if start_task.trace_id != str(start_task.uuid) else None
)
# if root, also change the trace id
if converted_span.context.trace_id == converted_span.context.span_id:
converted_span.context.trace_id = str(start_task.uuid)
converted_span.context.span_id = str(start_task.uuid)
self.tracers.get(start_task.parent, self.root).entries.append(converted_span)
self.tracers[start_task.uuid] = converted_span
self.tasks[start_task.uuid] = converted_span

def end_task(self, log_line: LogLine) -> None:
end_task = EndTask.model_validate(log_line.entry)
task_span = self.tasks[end_task.uuid]
task_span.record_output(end_task.output)
task_span.end(end_task.end)

def start_span(self, log_line: LogLine) -> None:
start_span = StartSpan.model_validate(log_line.entry)
converted_span = InMemorySpan(
name=start_span.name,
start_timestamp=start_span.start,
context=Context(
trace_id=start_span.trace_id, span_id=str(start_span.parent)
) if start_span.trace_id != str(start_span.uuid) else None
)
# if root, also change the trace id
if converted_span.context.trace_id == converted_span.context.span_id:
converted_span.context.trace_id = str(start_span.uuid)
converted_span.context.span_id = str(start_span.uuid)

self.tracers.get(start_span.parent, self.root).entries.append(converted_span)
self.tracers[start_span.uuid] = converted_span
self.spans[start_span.uuid] = converted_span

def end_span(self, log_line: LogLine) -> None:
end_span = EndSpan.model_validate(log_line.entry)
span = self.spans[end_span.uuid]
span.end(end_span.end)

def plain_entry(self, log_line: LogLine) -> None:
plain_entry = PlainEntry.model_validate(log_line.entry)
entry = LogEntry(
message=plain_entry.message,
value=plain_entry.value,
timestamp=plain_entry.timestamp,
trace_id=plain_entry.trace_id,
)
self.tracers[plain_entry.parent].entries.append(entry)
Loading

0 comments on commit 36c63b4

Please sign in to comment.