Skip to content

Commit

Permalink
Add pydoc and catch Task.run exception
Browse files Browse the repository at this point in the history
  • Loading branch information
NickyHavoc committed Nov 23, 2023
1 parent 29eb873 commit 0d4bb68
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 52 deletions.
5 changes: 3 additions & 2 deletions src/intelligence_layer/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
Language,
)
from .echo import EchoInput, EchoOutput, EchoTask
from .evaluator import AggregatedEvaluation
from .evaluator import Dataset as Dataset
from .evaluator import Evaluation
from .evaluator import EvaluationException as EvaluationException
from .evaluator import EvaluationRepository as EvaluationRepository
from .evaluator import Evaluator as Evaluator
from .evaluator import ExampleResult as ExampleResult
from .evaluator import EvaluationException as EvaluationException
from .evaluator import EvaluationRunOverview as EvaluationRunOverview
from .evaluator import Example as Example
from .evaluator import InMemoryEvaluationRepository as InMemoryEvaluationRepository
from .evaluator import LogTrace as LogTrace
Expand Down
186 changes: 155 additions & 31 deletions src/intelligence_layer/core/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Example(BaseModel, Generic[Input, ExpectedOutput]):
"""Example case used for evaluations.
Attributes:
input: Input for the task. Has to be same type as the input for the task used.
input: Input for the :class:`Task`. Has to be same type as the input for the task used.
expected_output: The expected output from a given example run.
This will be used by the evaluator to compare the received output with.
id: Identifier for the example, defaults to uuid.
Expand All @@ -55,11 +55,11 @@ class Example(BaseModel, Generic[Input, ExpectedOutput]):


class Dataset(Protocol, Generic[Input, ExpectedOutput]):
"""A dataset of examples used for evaluation of a task.
"""A dataset of examples used for evaluation of a :class:`Task`.
Attributes:
name: This a human readable identifier for a dataset.
examples: The actual examples that a task will be evaluated on.
examples: The actual examples that a :class:`Task` will be evaluated on.
"""

@property
Expand All @@ -72,18 +72,41 @@ def examples(self) -> Iterable[Example[Input, ExpectedOutput]]:


class SequenceDataset(BaseModel, Generic[Input, ExpectedOutput]):
"""A :class:`Dataset` that contains all examples in a sequence.
We recommend using this when it is certain that all examples
fit in memory.
Attributes:
name: This a human readable identifier for a :class:`Dataset`.
examples: The actual examples that a :class:`Task` will be evaluated on.
"""

name: str
examples: Sequence[Example[Input, ExpectedOutput]]


class EvaluationException(BaseModel):
"""Captures an exception raised during evaluating a :class:`Task`.
Attributes:
error_message: String-representation of the exception.
"""

error_message: str


Trace = Union["TaskSpanTrace", "SpanTrace", "LogTrace"]


class SpanTrace(BaseModel):
"""Represents traces contained by :class:`Span`
Attributes:
traces: The child traces.
start: Start time of the span.
end: End time of the span.
"""
model_config = ConfigDict(frozen=True)

traces: Sequence[Trace]
Expand All @@ -93,7 +116,7 @@ class SpanTrace(BaseModel):
@staticmethod
def from_span(span: InMemorySpan) -> "SpanTrace":
return SpanTrace(
traces=[to_trace_entry(t) for t in span.entries],
traces=[_to_trace_entry(t) for t in span.entries],
start=span.start_timestamp,
end=span.end_timestamp,
)
Expand All @@ -106,6 +129,12 @@ def _rich_render_(self) -> Tree:


class TaskSpanTrace(SpanTrace):
"""Represents traces contained by :class:`TaskSpan`
Attributes:
input: Input from the traced :class:`Task`.
output: Output of the traced :class:`Task`.
"""
model_config = ConfigDict(frozen=True)

input: SerializeAsAny[JsonSerializable]
Expand All @@ -114,7 +143,7 @@ class TaskSpanTrace(SpanTrace):
@staticmethod
def from_task_span(task_span: InMemoryTaskSpan) -> "TaskSpanTrace":
return TaskSpanTrace(
traces=[to_trace_entry(t) for t in task_span.entries],
traces=[_to_trace_entry(t) for t in task_span.entries],
start=task_span.start_timestamp,
end=task_span.end_timestamp,
# RootModel.model_dump is declared to return the type of root, but actually returns
Expand All @@ -139,6 +168,15 @@ def _ipython_display_(self) -> None:


class LogTrace(BaseModel):
"""Represents a :class:`LogEntry`.
Attributes:
message: A description of the value that is being logged, such as the step in the
:class:`Task` this is related to.
value: The logged data. Can be anything that is serializable by Pydantic,
which gives the tracers flexibility in how they store and emit the logs.
"""

model_config = ConfigDict(frozen=True)

message: str
Expand All @@ -164,7 +202,7 @@ def _render_log_value(value: JsonSerializable, title: str) -> Panel:
)


def to_trace_entry(entry: InMemoryTaskSpan | InMemorySpan | LogEntry) -> Trace:
def _to_trace_entry(entry: InMemoryTaskSpan | InMemorySpan | LogEntry) -> Trace:
if isinstance(entry, InMemoryTaskSpan):
return TaskSpanTrace.from_task_span(entry)
elif isinstance(entry, InMemorySpan):
Expand All @@ -174,12 +212,33 @@ def to_trace_entry(entry: InMemoryTaskSpan | InMemorySpan | LogEntry) -> Trace:


class ExampleResult(BaseModel, Generic[Evaluation]):
"""Result of a single evaluated :class:`Example`
Created to persist the evaluation result in the repository.
Attributes:
example_id: Identifier of the :class:`Example` evaluated.
result: If the evaluation was successful, evaluation's result,
otherwise the exception raised during running or evaluating
the :class:`Task`.
trace: Generated when running the :class:`Task`.
"""

example_id: str
result: SerializeAsAny[Evaluation | EvaluationException]
trace: TaskSpanTrace


class EvaluationRunOverview(BaseModel, Generic[AggregatedEvaluation]):
"""Overview of the results of evaluating a :class:`Task` on a :class:`Dataset`.
Created when running :func:`Evaluator.evaluate_dataset`. Contains high-level information and statistics.
Attributes:
id: Identifier of the run.
statistics: Aggregated statistics of the run.
"""

id: str
# dataset_id: str
# failed_evaluation_count: int
Expand All @@ -190,34 +249,82 @@ class EvaluationRunOverview(BaseModel, Generic[AggregatedEvaluation]):


class EvaluationRepository(ABC):
"""Base evaluation repository interface.
Provides methods to store and load evaluation results for individual examples
of a run and the aggregated evaluation of said run.
"""

@abstractmethod
def evaluation_run_results(
self, run_id: str, evaluation_type: type[Evaluation]
) -> Sequence[ExampleResult[Evaluation]]:
"""Returns all :class:`ExampleResult` instances of a given run
Args:
run_id: Identifier of the run to obtain the results for.
evaluation_type: Type of evaluations that the :class:`Evaluator` returned
in :func:`Evaluator.do_evaluate`
Returns:
All :class:`ExampleResult` of the run. Will return an empty list if there's none.
"""
...

@abstractmethod
def evaluation_example_result(
self, run_id: str, example_id: str, evaluation_type: type[Evaluation]
) -> Optional[ExampleResult[Evaluation]]:
"""Returns an :class:`ExampleResult` of a given run by its id.
Args:
run_id: Identifier of the run to obtain the results for.
example_id: Identifier of the :class:`ExampleResult` to be retrieved.
evaluation_type: Type of evaluations that the `Evaluator` returned
in :func:`Evaluator.do_evaluate`
Returns:
:class:`ExampleResult` if one was found, `None` otherwise.
"""
...

@abstractmethod
def store_example_result(
self, run_id: str, result: ExampleResult[Evaluation]
) -> None:
...

@abstractmethod
def store_evaluation_run_overview(
self, overview: EvaluationRunOverview[AggregatedEvaluation]
) -> None:
"""Stores an :class:`ExampleResult` for a run in the repository.
Args:
run_id: Identifier of the run.
result: The result to be persisted.
"""
...

@abstractmethod
def evaluation_run_overview(
self, run_id: str, aggregation_type: type[AggregatedEvaluation]
) -> Optional[EvaluationRunOverview[AggregatedEvaluation]]:
"""Returns an :class:`EvaluationRunOverview` of a given run by its id.
Args:
run_id: Identifier of the run to obtain the overview for.
aggregation_type: Type of aggregations that the :class:`Evaluator` returned
in :func:`Evaluator.aggregate`
Returns:
:class:`EvaluationRunOverview` if one was found, `None` otherwise.
"""
...

@abstractmethod
def store_evaluation_run_overview(
self, overview: EvaluationRunOverview[AggregatedEvaluation]
) -> None:
"""Stores an :class:`EvaluationRunOverview` in the repository.
Args:
overview: The overview to be persisted.
"""
...


Expand Down Expand Up @@ -309,13 +416,14 @@ class Evaluator(
):
"""Base evaluator interface. This should run certain evaluation steps for some job.
We suggest supplying a `Task` in the `__init__` method and running it in the `evaluate` method.
We suggest supplying a :class:`Task` in the `__init__` method and running it in the :func:`Evaluator.evaluate` method.
Generics:
Input: Interface to be passed to the task that shall be evaluated.
ExpectedOutput: Output that is expected from the task run with the supplied input.
Evaluation: Interface of the metrics that come from the evaluated task.
AggregatedEvaluation: The aggregated results of an evaluation run with a dataset.
Input: Interface to be passed to the :class:`Task` that shall be evaluated.
Output: Type of the output of the :class:`Task` to be evaluated.
ExpectedOutput: Output that is expected from the run with the supplied input.
Evaluation: Interface of the metrics that come from the evaluated :class:`Task`.
AggregatedEvaluation: The aggregated results of an evaluation run with a :class:`Dataset`.
"""

def __init__(
Expand All @@ -333,20 +441,21 @@ def do_evaluate(
) -> Evaluation:
"""Executes the evaluation for this use-case.
The implementation of this method is responsible for running a task (usually supplied by the __init__ method)
and making any comparisons relevant to the evaluation.
The implementation of this method is responsible for running a :class:`Task` (usually
supplied by the __init__ method) and making any comparisons relevant to the evaluation.
Based on the results, it should create an `Evaluation` class with all the metrics and return it.
Args:
input: The input that was passed to the task to produce the output
output: Output of the task that shall be evaluated
expected_output: Output that is expected from the task.
input: The input that was passed to the :class:`Task` to produce the output
output: Output of the :class:`Task` that shall be evaluated
expected_output: Output that is compared to the generated output.
Returns:
Interface of the metrics that come from the evaluated task.
The metrics that come from the evaluated :class:`Task`.
"""
pass

def _evaluate(
def _evaluate_example(
self,
run_id: str,
example: Example[Input, ExpectedOutput],
Expand All @@ -368,8 +477,23 @@ def _evaluate(
def evaluate(
self, input: Input, expected_output: ExpectedOutput, tracer: Tracer
) -> Evaluation | EvaluationException:
output = self.task.run(input, tracer)
"""Evaluates a single example and returns an `Evaluation` or `EvaluationException`.
This will call the `run` method for the :class:`Task` defined in the `__init__` method.
Catches any errors that occur during :func:`Task.run` or :func:`Evaluator.do_evaluate`.
Args:
input: Input for the :class:`Task`. Has to be same type as the input for the task used.
expected_output: The expected output for the run.
This will be used by the evaluator to compare the received output with.
tracer: :class:`Tracer` used for tracing.
Returns:
Result of the evaluation or exception in case an error during running
the :class:`Task` or evaluation.
"""

try:
output = self.task.run(input, tracer)
return self.do_evaluate(input, output, expected_output)
except Exception as e:
return EvaluationException(error_message=str(e))
Expand All @@ -378,13 +502,13 @@ def evaluate(
def evaluate_dataset(
self, dataset: Dataset[Input, ExpectedOutput], tracer: Tracer
) -> EvaluationRunOverview[AggregatedEvaluation]:
"""Evaluates an entire datasets in a threaded manner and aggregates the results into an `AggregatedEvaluation`.
"""Evaluates an entire :class:`Dataset` in a threaded manner and aggregates the results into an `AggregatedEvaluation`.
This will call the `run` method for each example in the dataset.
This will call the `run` method for each example in the :class:`Dataset`.
Finally, it will call the `aggregate` method and return the aggregated results.
Args:
dataset: Dataset that will be used to evaluate a task.
dataset: Dataset that will be used to evaluate a :class:`Task`.
tracer: tracer used for tracing.
Returns:
The aggregated results of an evaluation run with a dataset.
Expand All @@ -394,7 +518,7 @@ def evaluate_dataset(
with ThreadPoolExecutor(max_workers=10) as executor:
evaluations = tqdm(
executor.map(
lambda example: self._evaluate(
lambda example: self._evaluate_example(
run_id,
example,
tracer,
Expand Down Expand Up @@ -422,8 +546,8 @@ def aggregate(self, evaluations: Iterable[Evaluation]) -> AggregatedEvaluation:
It should create an `AggregatedEvaluation` class and return it at the end.
Args:
evalautions: The results from running `evaluate_dataset` with a task.
evalautions: The results from running `evaluate_dataset` with a :class:`Task`.
Returns:
The aggregated results of an evaluation run with a dataset.
The aggregated results of an evaluation run with a :class:`Dataset`.
"""
pass
Loading

0 comments on commit 0d4bb68

Please sign in to comment.