Skip to content

Commit

Permalink
Adapt File Tracer
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorianSchepersAA committed May 22, 2024
1 parent 36c63b4 commit c019c0c
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 90 deletions.
2 changes: 1 addition & 1 deletion src/intelligence_layer/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def do_run(self, input: Input, task_span: TaskSpan) -> Output:

@final
def run(
self, input: Input, tracer: Tracer, trace_id: Optional[str] = None
self, input: Input, tracer: Tracer
) -> Output:
"""Executes the implementation of `do_run` for this use case.
Expand Down
19 changes: 17 additions & 2 deletions src/intelligence_layer/core/tracer/composite_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from typing import Generic, Optional, Sequence, TypeVar

from intelligence_layer.core.tracer.tracer import (
Context,
ExportedSpan,
PydanticSerializable,
Span,
TaskSpan,
Tracer,
utc_now,
Context
)

TracerVar = TypeVar("TracerVar", bound=Tracer)
Expand Down Expand Up @@ -73,7 +73,10 @@ class CompositeSpan(Generic[SpanVar], CompositeTracer[SpanVar], Span):
Args:
tracers: spans that will be forwarded all subsequent log and span calls.
"""
def __init__(self, tracers: Sequence[SpanVar],context: Optional[Context] = None) -> None:

def __init__(
self, tracers: Sequence[SpanVar], context: Optional[Context] = None
) -> None:
CompositeTracer.__init__(self, tracers)
Span.__init__(self, context=context)

Expand All @@ -92,6 +95,18 @@ def end(self, timestamp: Optional[datetime] = None) -> None:
for tracer in self.tracers:
tracer.end(timestamp)

@property
def status_code(self):
status_codes = {tracer.status_code for tracer in self.tracers}
if len(status_codes) > 1:
raise ValueError("Inconsistent status of traces in composite tracer. Status of all traces should be the same but they are different.")
return status_code[0]

@status_code.setter
def status_code(self, value):
for tracer in self.tracers:
tracer.status_code = value


class CompositeTaskSpan(CompositeSpan[TaskSpan], TaskSpan):
"""A :class:`TaskSpan` that allows for recording to multiple TaskSpans simultaneously.
Expand Down
23 changes: 4 additions & 19 deletions src/intelligence_layer/core/tracer/file_tracer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime
from json import loads
from pathlib import Path
from typing import Optional, Sequence
from typing import Optional

from pydantic import BaseModel

Expand All @@ -11,22 +11,8 @@
PersistentTaskSpan,
PersistentTracer,
)
from intelligence_layer.core.tracer.tracer import LogLine, PydanticSerializable
from intelligence_layer.core.tracer.tracer import (
Context,
Event,
ExportedSpan,
ExportedSpanList,
LogEntry,
PydanticSerializable,
Span,
SpanAttributes,
TaskSpan,
TaskSpanAttributes,
Tracer,
_render_log_value,
utc_now,
)
from intelligence_layer.core.tracer.tracer import Context, LogLine, PydanticSerializable


class FileTracer(PersistentTracer):
"""A `Tracer` that logs to a file.
Expand Down Expand Up @@ -91,16 +77,15 @@ def trace(self, trace_id: Optional[str] = None) -> InMemoryTracer:
return self._parse_log(filtered_traces)



class FileSpan(PersistentSpan, FileTracer):
"""A `Span` created by `FileTracer.span`."""

def __init__(self, log_file_path: Path, context: Optional[Context] = None) -> None:
PersistentSpan.__init__(self, context=context)
FileTracer.__init__(self, log_file_path=log_file_path)



class FileTaskSpan(PersistentTaskSpan, FileSpan):
"""A `TaskSpan` created by `FileTracer.task_span`."""

pass
20 changes: 15 additions & 5 deletions src/intelligence_layer/core/tracer/in_memory_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import requests
import rich
from pydantic import BaseModel, Field, SerializeAsAny
from pydantic import SerializeAsAny
from requests import HTTPError
from rich.tree import Tree

Expand Down Expand Up @@ -157,6 +157,7 @@ def log(
def end(self, timestamp: Optional[datetime] = None) -> None:
if not self.end_timestamp:
self.end_timestamp = timestamp or utc_now()
super().end(timestamp)

def _rich_render_(self) -> Tree:
"""Renders the trace via classes in the `rich` package"""
Expand All @@ -171,6 +172,8 @@ def _span_attributes(self) -> SpanAttributes:
return SpanAttributes()

def export_for_viewing(self) -> Sequence[ExportedSpan]:
if not self._closed:
raise RuntimeError("Span is not closed. A Span must be closed befor it is exported for viewing.")
logs: list[LogEntry] = []
exported_spans: list[ExportedSpan] = []
for entry in self.entries:
Expand Down Expand Up @@ -246,9 +249,11 @@ def start_task(self, log_line: LogLine) -> None:
name=start_task.name,
input=start_task.input,
start_timestamp=start_task.start,
context=Context(
context=Context(
trace_id=start_task.trace_id, span_id=str(start_task.parent)
) if start_task.trace_id != str(start_task.uuid) else None
)
if start_task.trace_id != str(start_task.uuid)
else None,
)
# if root, also change the trace id
if converted_span.context.trace_id == converted_span.context.span_id:
Expand All @@ -262,6 +267,7 @@ def end_task(self, log_line: LogLine) -> None:
end_task = EndTask.model_validate(log_line.entry)
task_span = self.tasks[end_task.uuid]
task_span.record_output(end_task.output)
task_span.status_code = end_task.status_code
task_span.end(end_task.end)

def start_span(self, log_line: LogLine) -> None:
Expand All @@ -271,22 +277,26 @@ def start_span(self, log_line: LogLine) -> None:
start_timestamp=start_span.start,
context=Context(
trace_id=start_span.trace_id, span_id=str(start_span.parent)
) if start_span.trace_id != str(start_span.uuid) else None
)
if start_span.trace_id != str(start_span.uuid)
else None,
)
# if root, also change the trace id
if converted_span.context.trace_id == converted_span.context.span_id:
converted_span.context.trace_id = str(start_span.uuid)
converted_span.context.span_id = str(start_span.uuid)

self.tracers.get(start_span.parent, self.root).entries.append(converted_span)
self.tracers[start_span.uuid] = converted_span
self.spans[start_span.uuid] = converted_span

def end_span(self, log_line: LogLine) -> None:
end_span = EndSpan.model_validate(log_line.entry)
span = self.spans[end_span.uuid]
span.status_code = end_span.status_code
span.end(end_span.end)


def plain_entry(self, log_line: LogLine) -> None:
plain_entry = PlainEntry.model_validate(log_line.entry)
entry = LogEntry(
Expand Down
11 changes: 9 additions & 2 deletions src/intelligence_layer/core/tracer/open_telemetry_tracer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from datetime import datetime
from typing import Optional
from typing import Optional, Sequence

from opentelemetry.context import attach, detach
from opentelemetry.trace import Span as OpenTSpan
from opentelemetry.trace import Tracer as OpenTTracer
from opentelemetry.trace import set_span_in_context

from intelligence_layer.core.tracer.tracer import (
ExportedSpan,
LogEntry,
PydanticSerializable,
Span,
TaskSpan,
Expand All @@ -19,7 +21,7 @@ class OpenTelemetryTracer(Tracer):
"""A `Tracer` that uses open telemetry."""

def __init__(self, tracer: OpenTTracer) -> None:
self._tracer = tracer
self.O_tracer = tracer

def span(
self,
Expand All @@ -34,6 +36,7 @@ def span(
start_time=None if not timestamp else _open_telemetry_timestamp(timestamp),
)
token = attach(set_span_in_context(tracer_span))
self._tracer
return OpenTelemetrySpan(tracer_span, self._tracer, token, trace_id)

def task_span(
Expand All @@ -52,6 +55,9 @@ def task_span(
)
token = attach(set_span_in_context(tracer_span))
return OpenTelemetryTaskSpan(tracer_span, self._tracer, token, trace_id)

def export_for_viewing(self) -> Sequence[ExportedSpan]:
raise NotImplementedError("The OpenTelemetryTracer does not support export for viewing, as it can not acces its own traces.")


class OpenTelemetrySpan(Span, OpenTelemetryTracer):
Expand Down Expand Up @@ -87,6 +93,7 @@ def end(self, timestamp: Optional[datetime] = None) -> None:
self.open_ts_span.end(
_open_telemetry_timestamp(timestamp) if timestamp is not None else None
)
super().end(timestamp)


class OpenTelemetryTaskSpan(TaskSpan, OpenTelemetrySpan):
Expand Down
38 changes: 18 additions & 20 deletions src/intelligence_layer/core/tracer/persistent_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from intelligence_layer.core.tracer.tracer import (
EndSpan,
EndTask,
ExportedSpan,
LogLine,
PlainEntry,
PydanticSerializable,
Expand All @@ -20,26 +21,11 @@
utc_now,
)

from intelligence_layer.core.tracer.tracer import (
Context,
Event,
ExportedSpan,
ExportedSpanList,
LogEntry,
PydanticSerializable,
Span,
SpanAttributes,
TaskSpan,
TaskSpanAttributes,
Tracer,
_render_log_value,
utc_now,
)

class PersistentTracer(Tracer, ABC):
def __init__(self) -> None:
self.current_id = uuid4()

@abstractmethod
def _log_entry(self, id: str, entry: BaseModel) -> None:
pass
Expand Down Expand Up @@ -77,7 +63,9 @@ def _log_task(
task_span.context.trace_id,
StartTask(
uuid=task_span.context.span_id,
parent=self.context.span_id if self.context else task_span.context.trace_id,
parent=self.context.span_id
if self.context
else task_span.context.trace_id,
name=task_name,
start=timestamp or utc_now(),
input=input,
Expand All @@ -89,7 +77,9 @@ def _log_task(
task_span.context.trace_id,
StartTask(
uuid=task_span.context.span_id,
parent=self.context.span_id if self.context else task_span.context.trace_id,
parent=self.context.span_id
if self.context
else task_span.context.trace_id,
name=task_name,
start=timestamp or utc_now(),
input=error.description,
Expand Down Expand Up @@ -151,7 +141,10 @@ def log(
def end(self, timestamp: Optional[datetime] = None) -> None:
if not self.end_timestamp:
self.end_timestamp = timestamp or utc_now()
self._log_entry(self.context.trace_id, EndSpan(uuid=self.context.span_id, end=self.end_timestamp))
self._log_entry(
self.context.trace_id,
EndSpan(uuid=self.context.span_id, end=self.end_timestamp, status_code=self.status_code),
)


class PersistentTaskSpan(TaskSpan, PersistentSpan, ABC):
Expand All @@ -165,7 +158,12 @@ def end(self, timestamp: Optional[datetime] = None) -> None:
self.end_timestamp = timestamp or utc_now()
self._log_entry(
self.context.trace_id,
EndTask(uuid=self.context.span_id, end=self.end_timestamp, output=self.output),
EndTask(
uuid=self.context.span_id,
end=self.end_timestamp,
output=self.output,
status_code=self.status_code
),
)


Expand Down
8 changes: 5 additions & 3 deletions src/intelligence_layer/core/tracer/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class Span(Tracer, AbstractContextManager["Span"]):
"""

def __init__(self, context: Optional[Context] = None):
#super().__init__()
# super().__init__()
span_id = str(uuid4())
if context is None:
trace_id = span_id
Expand All @@ -216,7 +216,7 @@ def __init__(self, context: Optional[Context] = None):

def __enter__(self) -> Self:
if self._closed:
raise ValueError("Spans cannot be opened once they have been close.")
raise ValueError("Spans cannot be opened once they have been closed.")
return self

@abstractmethod
Expand Down Expand Up @@ -252,7 +252,7 @@ def end(self, timestamp: Optional[datetime] = None) -> None:
Args:
timestamp: Optional override of the timestamp, otherwise should be set to now.
"""
...
self._closed = True

def ensure_id(self, id: str | None) -> str:
"""Returns a valid id for tracing.
Expand Down Expand Up @@ -444,6 +444,7 @@ class EndTask(BaseModel):
uuid: UUID
end: datetime
output: SerializeAsAny[PydanticSerializable]
status_code: SpanStatus = SpanStatus.OK


class StartSpan(BaseModel):
Expand Down Expand Up @@ -475,6 +476,7 @@ class EndSpan(BaseModel):

uuid: UUID
end: datetime
status_code: SpanStatus = SpanStatus.OK


class PlainEntry(BaseModel):
Expand Down
11 changes: 11 additions & 0 deletions tests/core/tracer/fixtures/old_file_trace_format.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"StartTask","entry":{"uuid":"41528209-1b78-4785-a00d-7f65af1bb09c","parent":"75e79a11-1a26-4731-8b49-ef8634c352ed","name":"TestTask","start":"2024-05-22T09:43:37.428758Z","input":"input","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"StartSpan","entry":{"uuid":"ad1ed79b-6ad6-4ea5-8ee8-26be4055e228","parent":"41528209-1b78-4785-a00d-7f65af1bb09c","name":"span","start":"2024-05-22T09:43:37.429448Z","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"PlainEntry","entry":{"message":"message","value":"a value","timestamp":"2024-05-22T09:43:37.429503Z","parent":"ad1ed79b-6ad6-4ea5-8ee8-26be4055e228","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"StartTask","entry":{"uuid":"e8cca541-57a8-440a-b848-7c3b33a97f52","parent":"ad1ed79b-6ad6-4ea5-8ee8-26be4055e228","name":"TestSubTask","start":"2024-05-22T09:43:37.429561Z","input":null,"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"PlainEntry","entry":{"message":"subtask","value":"value","timestamp":"2024-05-22T09:43:37.429605Z","parent":"e8cca541-57a8-440a-b848-7c3b33a97f52","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"EndTask","entry":{"uuid":"e8cca541-57a8-440a-b848-7c3b33a97f52","end":"2024-05-22T09:43:37.429647Z","output":null}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"EndSpan","entry":{"uuid":"ad1ed79b-6ad6-4ea5-8ee8-26be4055e228","end":"2024-05-22T09:43:37.429687Z"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"StartTask","entry":{"uuid":"8840185c-2019-4105-9178-1b0e20ab6388","parent":"41528209-1b78-4785-a00d-7f65af1bb09c","name":"TestSubTask","start":"2024-05-22T09:43:37.429728Z","input":null,"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"PlainEntry","entry":{"message":"subtask","value":"value","timestamp":"2024-05-22T09:43:37.429768Z","parent":"8840185c-2019-4105-9178-1b0e20ab6388","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"EndTask","entry":{"uuid":"8840185c-2019-4105-9178-1b0e20ab6388","end":"2024-05-22T09:43:37.429806Z","output":null}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"EndTask","entry":{"uuid":"41528209-1b78-4785-a00d-7f65af1bb09c","end":"2024-05-22T09:43:37.429842Z","output":"output"}}
Loading

0 comments on commit c019c0c

Please sign in to comment.