Skip to content

Commit

Permalink
refactor(tracer): split large tracer file into multiple specific tarc…
Browse files Browse the repository at this point in the history
…er files

TASK: IL-297
  • Loading branch information
MerlinKallenbornTNG committed Feb 28, 2024
1 parent b8206f1 commit 9332b16
Show file tree
Hide file tree
Showing 10 changed files with 508 additions and 452 deletions.
2 changes: 1 addition & 1 deletion Concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Task(ABC, Generic[Input, Output]):

`Input` and `Output` are normal Python datatypes that can be serialized from and to JSON. For this the Intelligence
Layer relies on [Pydantic](https://docs.pydantic.dev/). The types that can actually be used are defined in form
of the type-alias [`PydanticSerializable`](src/intelligence_layer/core/tracer.py#L44).
of the type-alias [`PydanticSerializable`](src/intelligence_layer/core/tracer/tracer.py#L44).

The second parameter `task_span` is used for [tracing](#Trace) which is described below.

Expand Down
30 changes: 15 additions & 15 deletions src/intelligence_layer/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@
from .text_highlight import TextHighlight as TextHighlight
from .text_highlight import TextHighlightInput as TextHighlightInput
from .text_highlight import TextHighlightOutput as TextHighlightOutput
from .tracer import CompositeTracer as CompositeTracer
from .tracer import FileSpan as FileSpan
from .tracer import FileTaskSpan as FileTaskSpan
from .tracer import FileTracer as FileTracer
from .tracer import InMemorySpan as InMemorySpan
from .tracer import InMemoryTaskSpan as InMemoryTaskSpan
from .tracer import InMemoryTracer as InMemoryTracer
from .tracer import LogEntry as LogEntry
from .tracer import NoOpTracer as NoOpTracer
from .tracer import OpenTelemetryTracer as OpenTelemetryTracer
from .tracer import PydanticSerializable as PydanticSerializable
from .tracer import Span as Span
from .tracer import TaskSpan as TaskSpan
from .tracer import Tracer as Tracer
from .tracer import utc_now as utc_now
from .tracer.composite_tracer import CompositeTracer as CompositeTracer
from .tracer.in_memory_tracer import InMemorySpan as InMemorySpan
from .tracer.in_memory_tracer import InMemoryTaskSpan as InMemoryTaskSpan
from .tracer.in_memory_tracer import InMemoryTracer as InMemoryTracer
from .tracer.open_telemetry_tracer import OpenTelemetryTracer as OpenTelemetryTracer
from .tracer.tracer import FileSpan as FileSpan
from .tracer.tracer import FileTaskSpan as FileTaskSpan
from .tracer.tracer import FileTracer as FileTracer
from .tracer.tracer import LogEntry as LogEntry
from .tracer.tracer import NoOpTracer as NoOpTracer
from .tracer.tracer import PydanticSerializable as PydanticSerializable
from .tracer.tracer import Span as Span
from .tracer.tracer import TaskSpan as TaskSpan
from .tracer.tracer import Tracer as Tracer
from .tracer.tracer import utc_now as utc_now

__all__ = [symbol for symbol in dir()]
Empty file.
108 changes: 108 additions & 0 deletions src/intelligence_layer/core/tracer/composite_tracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from datetime import datetime
from typing import Generic, Optional, Sequence

from intelligence_layer.core import (
Chunk,
InMemoryTracer,
PydanticSerializable,
Span,
TaskSpan,
Tracer,
utc_now,
)
from intelligence_layer.core.tracer.tracer import SpanVar, TracerVar
from intelligence_layer.use_cases import ClassifyInput, PromptBasedClassify


class CompositeTracer(Tracer, Generic[TracerVar]):
"""A :class:`Tracer` that allows for recording to multiple tracers simultaneously.
Each log-entry and span will be forwarded to all subtracers.
Args:
tracers: tracers that will be forwarded all subsequent log and span calls.
Example:
>>> from intelligence_layer.core import InMemoryTracer, FileTracer, CompositeTracer, Chunk
>>> from intelligence_layer.use_cases import PromptBasedClassify, ClassifyInput
>>> tracer_1 = InMemoryTracer()
>>> tracer_2 = InMemoryTracer()
>>> tracer = CompositeTracer([tracer_1, tracer_2])
>>> task = PromptBasedClassify()
>>> response = task.run(ClassifyInput(chunk=Chunk("Cool"), labels=frozenset({"label", "other label"})), tracer)
"""

def __init__(self, tracers: Sequence[TracerVar]) -> None:
assert len(tracers) > 0
self.tracers = tracers

def span(
self,
name: str,
timestamp: Optional[datetime] = None,
trace_id: Optional[str] = None,
) -> "CompositeSpan[Span]":
timestamp = timestamp or utc_now()
trace_id = self.ensure_id(trace_id)
return CompositeSpan(
[tracer.span(name, timestamp, trace_id) for tracer in self.tracers]
)

def task_span(
self,
task_name: str,
input: PydanticSerializable,
timestamp: Optional[datetime] = None,
trace_id: Optional[str] = None,
) -> "CompositeTaskSpan":
timestamp = timestamp or utc_now()
trace_id = self.ensure_id(trace_id)
return CompositeTaskSpan(
[
tracer.task_span(task_name, input, timestamp, trace_id)
for tracer in self.tracers
]
)


class CompositeSpan(Generic[SpanVar], CompositeTracer[SpanVar], Span):
"""A :class:`Span` that allows for recording to multiple spans simultaneously.
Each log-entry and span will be forwarded to all subspans.
Args:
tracers: spans that will be forwarded all subsequent log and span calls.
"""

def id(self) -> str:
return self.tracers[0].id()

def log(
self,
message: str,
value: PydanticSerializable,
timestamp: Optional[datetime] = None,
) -> None:
timestamp = timestamp or utc_now()
for tracer in self.tracers:
tracer.log(message, value, timestamp)

def end(self, timestamp: Optional[datetime] = None) -> None:
timestamp = timestamp or utc_now()
for tracer in self.tracers:
tracer.end(timestamp)


class CompositeTaskSpan(CompositeSpan[TaskSpan], TaskSpan):
"""A :class:`TaskSpan` that allows for recording to multiple TaskSpans simultaneously.
Each log-entry and span will be forwarded to all subspans.
Args:
tracers: task spans that will be forwarded all subsequent log and span calls.
"""

def record_output(self, output: PydanticSerializable) -> None:
for tracer in self.tracers:
tracer.record_output(output)
134 changes: 134 additions & 0 deletions src/intelligence_layer/core/tracer/in_memory_tracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from datetime import datetime
from typing import Optional, Union

from pydantic import BaseModel, Field, SerializeAsAny
from rich.tree import Tree

from intelligence_layer.core import (
LogEntry,
PydanticSerializable,
Span,
TaskSpan,
Tracer,
utc_now,
)
from intelligence_layer.core.tracer.tracer import _render_log_value


class InMemoryTracer(BaseModel, Tracer):
"""Collects log entries in a nested structure, and keeps them in memory.
If desired, the structure is serializable with Pydantic, so you can write out the JSON
representation to a file, or return via an API, or something similar.
Attributes:
name: A descriptive name of what the tracer contains log entries about.
entries: A sequential list of log entries and/or nested InMemoryTracers with their own
log entries.
"""

entries: list[Union[LogEntry, "InMemoryTaskSpan", "InMemorySpan"]] = []

def span(
self,
name: str,
timestamp: Optional[datetime] = None,
trace_id: Optional[str] = None,
) -> "InMemorySpan":
child = InMemorySpan(
name=name,
start_timestamp=timestamp or utc_now(),
trace_id=self.ensure_id(trace_id),
)
self.entries.append(child)
return child

def task_span(
self,
task_name: str,
input: PydanticSerializable,
timestamp: Optional[datetime] = None,
trace_id: Optional[str] = None,
) -> "InMemoryTaskSpan":
child = InMemoryTaskSpan(
name=task_name,
input=input,
start_timestamp=timestamp or utc_now(),
trace_id=self.ensure_id(trace_id),
)
self.entries.append(child)
return child

def _rich_render_(self) -> Tree:
"""Renders the trace via classes in the `rich` package"""
tree = Tree(label="Trace")

for log in self.entries:
tree.add(log._rich_render_())

return tree

def _ipython_display_(self) -> None:
"""Default rendering for Jupyter notebooks"""
from rich import print

print(self._rich_render_())


class InMemorySpan(InMemoryTracer, Span):
name: str
start_timestamp: datetime = Field(default_factory=datetime.utcnow)
end_timestamp: Optional[datetime] = None
trace_id: str

def id(self) -> str:
return self.trace_id

def log(
self,
message: str,
value: PydanticSerializable,
timestamp: Optional[datetime] = None,
) -> None:
self.entries.append(
LogEntry(
message=message,
value=value,
timestamp=timestamp or utc_now(),
trace_id=self.id(),
)
)

def end(self, timestamp: Optional[datetime] = None) -> None:
if not self.end_timestamp:
self.end_timestamp = timestamp or utc_now()

def _rich_render_(self) -> Tree:
"""Renders the trace via classes in the `rich` package"""
tree = Tree(label=self.name)

for log in self.entries:
tree.add(log._rich_render_())

return tree


class InMemoryTaskSpan(InMemorySpan, TaskSpan):
input: SerializeAsAny[PydanticSerializable]
output: Optional[SerializeAsAny[PydanticSerializable]] = None

def record_output(self, output: PydanticSerializable) -> None:
self.output = output

def _rich_render_(self) -> Tree:
"""Renders the trace via classes in the `rich` package"""
tree = Tree(label=self.name)

tree.add(_render_log_value(self.input, "Input"))

for log in self.entries:
tree.add(log._rich_render_())

tree.add(_render_log_value(self.output, "Output"))

return tree
109 changes: 109 additions & 0 deletions src/intelligence_layer/core/tracer/open_telemetry_tracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from datetime import datetime
from typing import Optional

from opentelemetry.context import attach, detach
from opentelemetry.trace import Span as OpenTSpan
from opentelemetry.trace import Tracer as OpenTTracer
from opentelemetry.trace import set_span_in_context

from intelligence_layer.core.tracer.tracer import (
PydanticSerializable,
Span,
TaskSpan,
Tracer,
_serialize,
)


class OpenTelemetryTracer(Tracer):
"""A `Tracer` that uses open telemetry."""

def __init__(self, tracer: OpenTTracer) -> None:
self._tracer = tracer

def span(
self,
name: str,
timestamp: Optional[datetime] = None,
trace_id: Optional[str] = None,
) -> "OpenTelemetrySpan":
trace_id = self.ensure_id(trace_id)
tracer_span = self._tracer.start_span(
name,
attributes={"trace_id": trace_id},
start_time=None if not timestamp else _open_telemetry_timestamp(timestamp),
)
token = attach(set_span_in_context(tracer_span))
return OpenTelemetrySpan(tracer_span, self._tracer, token, trace_id)

def task_span(
self,
task_name: str,
input: PydanticSerializable,
timestamp: Optional[datetime] = None,
trace_id: Optional[str] = None,
) -> "OpenTelemetryTaskSpan":
trace_id = self.ensure_id(trace_id)

tracer_span = self._tracer.start_span(
task_name,
attributes={"input": _serialize(input), "trace_id": trace_id},
start_time=None if not timestamp else _open_telemetry_timestamp(timestamp),
)
token = attach(set_span_in_context(tracer_span))
return OpenTelemetryTaskSpan(tracer_span, self._tracer, token, trace_id)


class OpenTelemetrySpan(Span, OpenTelemetryTracer):
"""A `Span` created by `OpenTelemetryTracer.span`."""

end_timestamp: Optional[datetime] = None

def id(self) -> str:
return self._trace_id

def __init__(
self, span: OpenTSpan, tracer: OpenTTracer, token: object, trace_id: str
) -> None:
super().__init__(tracer)
self.open_ts_span = span
self._token = token
self._trace_id = trace_id

def log(
self,
message: str,
value: PydanticSerializable,
timestamp: Optional[datetime] = None,
) -> None:
self.open_ts_span.add_event(
message,
{"value": _serialize(value), "trace_id": self.id()},
None if not timestamp else _open_telemetry_timestamp(timestamp),
)

def end(self, timestamp: Optional[datetime] = None) -> None:
detach(self._token)
self.open_ts_span.end(
_open_telemetry_timestamp(timestamp) if timestamp is not None else None
)


class OpenTelemetryTaskSpan(TaskSpan, OpenTelemetrySpan):
"""A `TaskSpan` created by `OpenTelemetryTracer.task_span`."""

output: Optional[PydanticSerializable] = None

def __init__(
self, span: OpenTSpan, tracer: OpenTTracer, token: object, trace_id: str
) -> None:
super().__init__(span, tracer, token, trace_id)

def record_output(self, output: PydanticSerializable) -> None:
self.open_ts_span.set_attribute("output", _serialize(output))


def _open_telemetry_timestamp(t: datetime) -> int:
# Open telemetry expects *nanoseconds* since epoch
t_float = t.timestamp() * 1e9
return int(t_float)
Loading

0 comments on commit 9332b16

Please sign in to comment.