-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(tracer): split large tracer file into multiple specific trac…
…er files TASK: IL-297
- Loading branch information
1 parent
b8206f1
commit 33834f2
Showing
33 changed files
with
556 additions
and
500 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
from datetime import datetime | ||
from typing import Generic, Optional, Sequence | ||
|
||
from intelligence_layer.core import ( | ||
Chunk, | ||
InMemoryTracer, | ||
PydanticSerializable, | ||
Span, | ||
TaskSpan, | ||
Tracer, | ||
utc_now, | ||
) | ||
from intelligence_layer.core.tracer.tracer import SpanVar, TracerVar | ||
from intelligence_layer.use_cases import ClassifyInput, PromptBasedClassify | ||
|
||
|
||
class CompositeTracer(Tracer, Generic[TracerVar]): | ||
"""A :class:`Tracer` that allows for recording to multiple tracers simultaneously. | ||
Each log-entry and span will be forwarded to all subtracers. | ||
Args: | ||
tracers: tracers that will be forwarded all subsequent log and span calls. | ||
Example: | ||
>>> from intelligence_layer.core import InMemoryTracer, FileTracer, CompositeTracer, Chunk | ||
>>> from intelligence_layer.use_cases import PromptBasedClassify, ClassifyInput | ||
>>> tracer_1 = InMemoryTracer() | ||
>>> tracer_2 = InMemoryTracer() | ||
>>> tracer = CompositeTracer([tracer_1, tracer_2]) | ||
>>> task = PromptBasedClassify() | ||
>>> response = task.run(ClassifyInput(chunk=Chunk("Cool"), labels=frozenset({"label", "other label"})), tracer) | ||
""" | ||
|
||
def __init__(self, tracers: Sequence[TracerVar]) -> None: | ||
assert len(tracers) > 0 | ||
self.tracers = tracers | ||
|
||
def span( | ||
self, | ||
name: str, | ||
timestamp: Optional[datetime] = None, | ||
trace_id: Optional[str] = None, | ||
) -> "CompositeSpan[Span]": | ||
timestamp = timestamp or utc_now() | ||
trace_id = self.ensure_id(trace_id) | ||
return CompositeSpan( | ||
[tracer.span(name, timestamp, trace_id) for tracer in self.tracers] | ||
) | ||
|
||
def task_span( | ||
self, | ||
task_name: str, | ||
input: PydanticSerializable, | ||
timestamp: Optional[datetime] = None, | ||
trace_id: Optional[str] = None, | ||
) -> "CompositeTaskSpan": | ||
timestamp = timestamp or utc_now() | ||
trace_id = self.ensure_id(trace_id) | ||
return CompositeTaskSpan( | ||
[ | ||
tracer.task_span(task_name, input, timestamp, trace_id) | ||
for tracer in self.tracers | ||
] | ||
) | ||
|
||
|
||
class CompositeSpan(Generic[SpanVar], CompositeTracer[SpanVar], Span): | ||
"""A :class:`Span` that allows for recording to multiple spans simultaneously. | ||
Each log-entry and span will be forwarded to all subspans. | ||
Args: | ||
tracers: spans that will be forwarded all subsequent log and span calls. | ||
""" | ||
|
||
def id(self) -> str: | ||
return self.tracers[0].id() | ||
|
||
def log( | ||
self, | ||
message: str, | ||
value: PydanticSerializable, | ||
timestamp: Optional[datetime] = None, | ||
) -> None: | ||
timestamp = timestamp or utc_now() | ||
for tracer in self.tracers: | ||
tracer.log(message, value, timestamp) | ||
|
||
def end(self, timestamp: Optional[datetime] = None) -> None: | ||
timestamp = timestamp or utc_now() | ||
for tracer in self.tracers: | ||
tracer.end(timestamp) | ||
|
||
|
||
class CompositeTaskSpan(CompositeSpan[TaskSpan], TaskSpan): | ||
"""A :class:`TaskSpan` that allows for recording to multiple TaskSpans simultaneously. | ||
Each log-entry and span will be forwarded to all subspans. | ||
Args: | ||
tracers: task spans that will be forwarded all subsequent log and span calls. | ||
""" | ||
|
||
def record_output(self, output: PydanticSerializable) -> None: | ||
for tracer in self.tracers: | ||
tracer.record_output(output) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
from datetime import datetime | ||
from typing import Optional, Union | ||
|
||
from pydantic import BaseModel, Field, SerializeAsAny | ||
from rich.tree import Tree | ||
|
||
from intelligence_layer.core import ( | ||
LogEntry, | ||
PydanticSerializable, | ||
Span, | ||
TaskSpan, | ||
Tracer, | ||
utc_now, | ||
) | ||
from intelligence_layer.core.tracer.tracer import _render_log_value | ||
|
||
|
||
class InMemoryTracer(BaseModel, Tracer): | ||
"""Collects log entries in a nested structure, and keeps them in memory. | ||
If desired, the structure is serializable with Pydantic, so you can write out the JSON | ||
representation to a file, or return via an API, or something similar. | ||
Attributes: | ||
name: A descriptive name of what the tracer contains log entries about. | ||
entries: A sequential list of log entries and/or nested InMemoryTracers with their own | ||
log entries. | ||
""" | ||
|
||
entries: list[Union[LogEntry, "InMemoryTaskSpan", "InMemorySpan"]] = [] | ||
|
||
def span( | ||
self, | ||
name: str, | ||
timestamp: Optional[datetime] = None, | ||
trace_id: Optional[str] = None, | ||
) -> "InMemorySpan": | ||
child = InMemorySpan( | ||
name=name, | ||
start_timestamp=timestamp or utc_now(), | ||
trace_id=self.ensure_id(trace_id), | ||
) | ||
self.entries.append(child) | ||
return child | ||
|
||
def task_span( | ||
self, | ||
task_name: str, | ||
input: PydanticSerializable, | ||
timestamp: Optional[datetime] = None, | ||
trace_id: Optional[str] = None, | ||
) -> "InMemoryTaskSpan": | ||
child = InMemoryTaskSpan( | ||
name=task_name, | ||
input=input, | ||
start_timestamp=timestamp or utc_now(), | ||
trace_id=self.ensure_id(trace_id), | ||
) | ||
self.entries.append(child) | ||
return child | ||
|
||
def _rich_render_(self) -> Tree: | ||
"""Renders the trace via classes in the `rich` package""" | ||
tree = Tree(label="Trace") | ||
|
||
for log in self.entries: | ||
tree.add(log._rich_render_()) | ||
|
||
return tree | ||
|
||
def _ipython_display_(self) -> None: | ||
"""Default rendering for Jupyter notebooks""" | ||
from rich import print | ||
|
||
print(self._rich_render_()) | ||
|
||
|
||
class InMemorySpan(InMemoryTracer, Span): | ||
name: str | ||
start_timestamp: datetime = Field(default_factory=datetime.utcnow) | ||
end_timestamp: Optional[datetime] = None | ||
trace_id: str | ||
|
||
def id(self) -> str: | ||
return self.trace_id | ||
|
||
def log( | ||
self, | ||
message: str, | ||
value: PydanticSerializable, | ||
timestamp: Optional[datetime] = None, | ||
) -> None: | ||
self.entries.append( | ||
LogEntry( | ||
message=message, | ||
value=value, | ||
timestamp=timestamp or utc_now(), | ||
trace_id=self.id(), | ||
) | ||
) | ||
|
||
def end(self, timestamp: Optional[datetime] = None) -> None: | ||
if not self.end_timestamp: | ||
self.end_timestamp = timestamp or utc_now() | ||
|
||
def _rich_render_(self) -> Tree: | ||
"""Renders the trace via classes in the `rich` package""" | ||
tree = Tree(label=self.name) | ||
|
||
for log in self.entries: | ||
tree.add(log._rich_render_()) | ||
|
||
return tree | ||
|
||
|
||
class InMemoryTaskSpan(InMemorySpan, TaskSpan): | ||
input: SerializeAsAny[PydanticSerializable] | ||
output: Optional[SerializeAsAny[PydanticSerializable]] = None | ||
|
||
def record_output(self, output: PydanticSerializable) -> None: | ||
self.output = output | ||
|
||
def _rich_render_(self) -> Tree: | ||
"""Renders the trace via classes in the `rich` package""" | ||
tree = Tree(label=self.name) | ||
|
||
tree.add(_render_log_value(self.input, "Input")) | ||
|
||
for log in self.entries: | ||
tree.add(log._rich_render_()) | ||
|
||
tree.add(_render_log_value(self.output, "Output")) | ||
|
||
return tree |
Oops, something went wrong.