Skip to content

Commit

Permalink
wip: add tests for the new trace export
Browse files Browse the repository at this point in the history
  • Loading branch information
NiklasKoehneckeAA committed May 15, 2024
1 parent b7e1b89 commit b224062
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 9 deletions.
8 changes: 5 additions & 3 deletions src/intelligence_layer/core/tracer/composite_tracer.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from datetime import datetime
from typing import Generic, Optional, Sequence
from typing import Generic, Optional, Sequence, TypeVar

from intelligence_layer.core.tracer.tracer import (
PydanticSerializable,
Span,
SpanVar,
TaskSpan,
Tracer,
TracerVar,
utc_now,
)

TracerVar = TypeVar("TracerVar", bound=Tracer)

SpanVar = TypeVar("SpanVar", bound=Span)


class CompositeTracer(Tracer, Generic[TracerVar]):
"""A :class:`Tracer` that allows for recording to multiple tracers simultaneously.
Expand Down
60 changes: 54 additions & 6 deletions src/intelligence_layer/core/tracer/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
from abc import ABC, abstractmethod
from contextlib import AbstractContextManager
from datetime import datetime, timezone
from enum import Enum
from types import TracebackType
from typing import TYPE_CHECKING, Mapping, Optional, Sequence, TypeVar
from typing import TYPE_CHECKING, Mapping, Optional, Sequence
from uuid import UUID, uuid4

from pydantic import BaseModel, Field, RootModel, SerializeAsAny
Expand Down Expand Up @@ -50,6 +51,46 @@ def utc_now() -> datetime:
return datetime.now(timezone.utc)


class Event:
name: str
message: str
body: SerializeAsAny[PydanticSerializable]
timestamp: datetime = Field(default_factory=utc_now)


class SpanType(Enum):
SPAN = "SPAN"
TASK_SPAN = "TASK_SPAN"


class SpanAttributes(BaseModel):
type: SpanType = SpanType.SPAN


class TaskSpanAttributes(SpanAttributes):
type: SpanType = SpanType.TASK_SPAN
input: SerializeAsAny[PydanticSerializable]
output: SerializeAsAny[PydanticSerializable]


class SpanStatus(Enum):
OK = "OK"
ERROR = "ERROR"


class ExportedSpan:
id: str
# we ignore context as we only need the id from it
name: str | None
parent_id: str | None
start_time: datetime
end_time: datetime
attributes: SpanAttributes
events: Sequence[Event]
status: SpanStatus
# we ignore the links concept


class Tracer(ABC):
"""Provides a consistent way to instrument a :class:`Task` with logging for each step of the
workflow.
Expand Down Expand Up @@ -205,6 +246,18 @@ def __exit__(
self.log(error_value.message, error_value)
self.end()

@abstractmethod
def export_for_viewing(self) -> Sequence[ExportedSpan]:
"""Converts the span to a format that can be read by the trace viewer.
The format is inspired by the OpenTelemetry Format, but does not abide by it,
because it is too complex for our use-case.
Returns:
A list of spans which includes the current span and all its child spans.
"""
...


class TaskSpan(Span):
"""Specialized span for instrumenting :class:`Task` input, output, and nested spans and logs.
Expand Down Expand Up @@ -244,11 +297,6 @@ def __exit__(
self.end()


TracerVar = TypeVar("TracerVar", bound=Tracer)

SpanVar = TypeVar("SpanVar", bound=Span)


class NoOpTracer(TaskSpan):
"""A no-op tracer.
Expand Down
18 changes: 18 additions & 0 deletions tests/core/tracer/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from pathlib import Path

from pytest import fixture

from intelligence_layer.core import Task, TaskSpan
from intelligence_layer.core.tracer.file_tracer import FileTracer
from intelligence_layer.core.tracer.in_memory_tracer import InMemoryTracer


class TestSubTask(Task[None, None]):
Expand All @@ -19,6 +23,20 @@ def do_run(self, input: str, task_span: TaskSpan) -> str:
return "output"


class TestException(Exception):
pass


@fixture
def test_task() -> Task[str, str]:
return TestTask()


@fixture
def file_tracer(tmp_path: Path) -> FileTracer:
return FileTracer(tmp_path / "log.log")


@fixture
def in_memory_tracer() -> InMemoryTracer:
return InMemoryTracer()
95 changes: 95 additions & 0 deletions tests/core/tracer/test_tracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from pydantic import BaseModel

from intelligence_layer.core import InMemoryTracer
from intelligence_layer.core.tracer.tracer import (
SpanStatus,
SpanType,
TaskSpanAttributes,
utc_now,
)
from tests.core.tracer.conftest import TestException


class DummyObject(BaseModel):
content: str


def test_tracer_exports_spans_to_unified_format() -> None:
tracer = InMemoryTracer()
dummy_object = DummyObject(content="cool")
with tracer.span("name") as temp_span:
temp_span.log("test", dummy_object)

unified_format = tracer.export_for_viewing()

assert len(unified_format) == 1
span = unified_format[0]
assert span.name == "name"
assert span.start_time < span.end_time < utc_now()
assert span.attributes.type == SpanType.SPAN
assert span.status == SpanStatus.OK

assert len(span.events) == 1
log = span.events[0]
assert log.message == "test"
assert log.body == dummy_object
assert span.start_time < log.timestamp < span.end_time


def test_tracer_exports_task_spans_to_unified_format() -> None:
tracer = InMemoryTracer()

with tracer.task_span("name", "input") as task_span:
task_span.record_output("output")

unified_format = tracer.export_for_viewing()

assert len(unified_format) == 1
span = unified_format[0]
assert span.name == "name"
assert span.parent_id is None
assert span.start_time < span.end_time < utc_now()
assert span.attributes.type == SpanType.TASK_SPAN
assert isinstance(span.attributes, TaskSpanAttributes) # for mypy
assert span.attributes.input == "input"
assert span.attributes.output == "output"
assert span.status == SpanStatus.OK


def test_tracer_exports_error_correctly() -> None:
tracer = InMemoryTracer()
try:
with tracer.span("name"):
raise TestException
except TestException:
pass
unified_format = tracer.export_for_viewing()

assert len(unified_format) == 1
span = unified_format[0]
assert span.name == "name"
assert span.parent_id is None
assert span.start_time < span.end_time < utc_now()
assert span.attributes.type == SpanType.SPAN
assert span.status == SpanStatus.ERROR


def test_tracer_export_nests_correctly() -> None:
tracer = InMemoryTracer()
with tracer.span("name") as parent_span:
with parent_span.span("name-2") as child_span:
child_span.log("", value="")

unified_format = tracer.export_for_viewing()

assert len(unified_format) == 2
parent, child = unified_format[0], unified_format[1]
if parent.parent_id is not None:
parent, child = child, parent
assert parent.name == "name"
assert parent.parent_id is None
assert parent.end_time > child.end_time
assert parent.start_time < child.start_time
assert child.name == "name-2"
assert child.parent_id == parent.id
assert len(child.events) == 0

0 comments on commit b224062

Please sign in to comment.