diff --git a/src/intelligence_layer/core/__init__.py b/src/intelligence_layer/core/__init__.py index 252c2635..09b247ca 100644 --- a/src/intelligence_layer/core/__init__.py +++ b/src/intelligence_layer/core/__init__.py @@ -5,6 +5,7 @@ from .chunk import ChunkWithIndicesOutput as ChunkWithIndicesOutput from .chunk import ChunkWithStartEndIndices as ChunkWithStartEndIndices from .chunk import TextChunk as TextChunk +from .decorators import trace as trace from .detect_language import DetectLanguage as DetectLanguage from .detect_language import DetectLanguageInput as DetectLanguageInput from .detect_language import DetectLanguageOutput as DetectLanguageOutput diff --git a/src/intelligence_layer/core/decorators.py b/src/intelligence_layer/core/decorators.py new file mode 100644 index 00000000..cea86f1b --- /dev/null +++ b/src/intelligence_layer/core/decorators.py @@ -0,0 +1,69 @@ +"""Module for decorators used in the intelligence layer.""" + +from collections.abc import Callable +from functools import wraps +from typing import Any, ParamSpec, TypeVar, get_type_hints + +from pydantic import BaseModel + +from intelligence_layer.core.tracer.tracer import ( + NoOpTracer, + Tracer, +) + +T = TypeVar("T") +P = ParamSpec("P") + + +class SerializableInput(BaseModel): + """Pydantic model for serializing input data.""" + + args: dict[str, Any] + kwargs: dict[str, Any] + + +def trace(func: Callable[P, T]) -> Callable[P, T]: + """A decorator to trace the execution of a method in a Task subclass. + + This decorator wraps the method execution with tracing logic, creating a task span and recording the output. + It retrieves the tracer from the Task instance and uses the method name as the task name. + + Args: + func: The method to be traced. + + Returns: + Callable: A decorator that traces the function execution. + """ + + @wraps(func) + def wrapper(self: Any, *args: P.args, **kwargs: P.kwargs) -> T: + """Wrapper function to execute the original function with tracing. + + Args: + self: The instance of the Task subclass. + *args: Positional arguments passed to the original function. + **kwargs: Keyword arguments passed to the original function. + + Returns: + PydanticSerializable: The output of the original function. + """ + tracer: Tracer = getattr(self, "_tracer", NoOpTracer()) + name = func.__name__ + + arg_names = list(get_type_hints(func).keys()) + args_dict = {arg_names[i]: arg for i, arg in enumerate(args)} + input_data = SerializableInput(args=args_dict, kwargs=kwargs) + + if self.current_task_span is None: + self.current_task_span = tracer.task_span(name, input_data) + else: + self.current_task_span = self.current_task_span.task_span(name, input_data) + + with self.current_task_span as task_span: + output: T = func(self, *args, **kwargs) + task_span.record_output( + SerializableInput(args={"output": output}, kwargs={}) + ) + return output + + return wrapper # type: ignore diff --git a/src/intelligence_layer/core/task.py b/src/intelligence_layer/core/task.py index fa20395e..d4d134f0 100644 --- a/src/intelligence_layer/core/task.py +++ b/src/intelligence_layer/core/task.py @@ -42,6 +42,18 @@ class Task(ABC, Generic[Input, Output]): Output: Interface of the output returned by the task. """ + _tracer: Tracer + _current_task_span: TaskSpan + + @property + def current_task_span(self) -> TaskSpan: + """The current task span for this task.""" + return self._current_task_span + + @current_task_span.setter + def current_task_span(self, task_span: TaskSpan) -> None: + self._current_task_span = task_span + @abstractmethod def do_run(self, input: Input, task_span: TaskSpan) -> Output: """The implementation for this use case. @@ -74,7 +86,9 @@ def run(self, input: Input, tracer: Tracer) -> Output: Returns: Generic output defined by the task implementation. """ + self._tracer = tracer with tracer.task_span(type(self).__name__, input) as task_span: + self._current_task_span = task_span output = self.do_run(input, task_span) task_span.record_output(output) return output