Skip to content

Commit

Permalink
feat: Add IncrementalEvaluator and IncrementalEvaluationLogic (#823)
Browse files Browse the repository at this point in the history
Task: IL-315

Co-authored-by: FelixFehse <[email protected]>
  • Loading branch information
SebastianNiehusAA and FelixFehse authored May 3, 2024
1 parent c32889b commit 8900b7c
Show file tree
Hide file tree
Showing 3 changed files with 285 additions and 2 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
...

### New Features
...
- Add new `IncrementalEvaluator` for easier addition of runs to existing evaluations without repeated evaluation.
- Add `IncrementalEvaluationLogic` for use in `IncrementalEvaluator`

### Fixes
...
Expand Down
157 changes: 156 additions & 1 deletion src/intelligence_layer/evaluation/evaluation/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,62 @@ def do_evaluate(
pass


class IncrementalEvaluationLogic(
EvaluationLogic[Input, Output, ExpectedOutput, Evaluation]
):
def __init__(self) -> None:
super().__init__()
self._previous_run_output_ids: list[set[str]] = []

def set_previous_run_output_ids(
self, previous_run_output_ids: list[set[str]]
) -> None:
self._previous_run_output_ids = previous_run_output_ids

def do_evaluate(
self,
example: Example[Input, ExpectedOutput],
*outputs: SuccessfulExampleOutput[Output],
) -> Evaluation:
"""Executes the evaluation for this specific example.
Responsible for comparing the input & expected output of a task to the
actually generated output. The difference to the standard :class:`EvaluationLogic`'s `do_evaluate` is that
this method will separate already processed evaluation from new ones before handing them over to
`do_incremental_evaluate`.
Args:
example: Input data of :class:`Task` to produce the output.
outputs: Outputs of the :class:`Task`.
Returns:
:class:`Evaluation`: The metrics that come from the evaluated :class:`Task`.
"""
flattened_run_output_ids: set[str] = set()
evaluated_outputs = []
for run_output_ids in self._previous_run_output_ids:
flattened_run_output_ids = flattened_run_output_ids.union(run_output_ids)
evaluated_outputs.append(
[output for output in outputs if output.run_id in run_output_ids]
)

new_outputs = [
output
for output in outputs
if output.run_id not in flattened_run_output_ids
]
return self.do_incremental_evaluate(example, new_outputs, evaluated_outputs)

@abstractmethod
def do_incremental_evaluate(
self,
example: Example[Input, ExpectedOutput],
outputs: list[SuccessfulExampleOutput[Output]],
already_evaluated_outputs: list[list[SuccessfulExampleOutput[Output]]],
) -> Evaluation:
pass


class SingleOutputEvaluationLogic(
EvaluationLogic[Input, Output, ExpectedOutput, Evaluation]
):
Expand Down Expand Up @@ -221,7 +277,6 @@ def evaluation_type(self) -> type[Evaluation]:
)
return cast(type[Evaluation], evaluation_type)

@final
def evaluate_runs(
self,
*run_ids: str,
Expand Down Expand Up @@ -449,3 +504,103 @@ def evaluation_lineage(
output_type=self.output_type(),
evaluation_type=self.evaluation_type(),
)


class IncrementalEvaluator(Evaluator[Input, Output, ExpectedOutput, Evaluation]):
""":class:`Evaluator` for evaluating additional runs on top of previous evaluations. Intended for use with :class:`IncrementalEvaluationLogic`.
Args:
dataset_repository: The repository with the examples that will be taken for the evaluation.
run_repository: The repository of the runs to evaluate.
evaluation_repository: The repository that will be used to store evaluation results.
description: Human-readable description for the evaluator.
incremental_evaluation_logic: The logic to use for evaluation.
Generics:
Input: Interface to be passed to the :class:`Task` that shall be evaluated.
Output: Type of the output of the :class:`Task` to be evaluated.
ExpectedOutput: Output that is expected from the run with the supplied input.
Evaluation: Interface of the metrics that come from the evaluated :class:`Task`.
"""

def __init__(
self,
dataset_repository: DatasetRepository,
run_repository: RunRepository,
evaluation_repository: EvaluationRepository,
description: str,
incremental_evaluation_logic: IncrementalEvaluationLogic[
Input, Output, ExpectedOutput, Evaluation
],
) -> None:
super().__init__(
dataset_repository=dataset_repository,
run_repository=run_repository,
evaluation_repository=evaluation_repository,
description=description,
evaluation_logic=incremental_evaluation_logic,
)

def evaluate_additional_runs(
self,
*run_ids: str,
previous_evaluation_ids: Optional[list[str]] = None,
num_examples: Optional[int] = None,
abort_on_error: bool = False,
) -> EvaluationOverview:
"""Evaluate all runs while considering which runs have already been evaluated according to `previous_evaluation_id`.
For each set of successful outputs in the referenced runs,
:func:`EvaluationLogic.do_evaluate` is called and eval metrics are produced &
stored in the provided :class:`EvaluationRepository`.
Args:
run_ids: The runs to be evaluated. Each run is expected to have the same
dataset as input (which implies their tasks have the same input-type)
and their tasks have the same output-type. For each example in the
dataset referenced by the runs the outputs of all runs are collected
and if all of them were successful they are passed on to the implementation
specific evaluation. The method compares all run of the provided ids to each other.
previous_evaluation_ids: IDs of previous evaluation to consider
num_examples: The number of examples which should be evaluated from the given runs.
Always the first n runs stored in the evaluation repository
abort_on_error: Flag to abort all evaluations when an error occurs. Defaults to False.
Returns:
EvaluationOverview: An overview of the evaluation. Individual :class:`Evaluation`s will not be
returned but instead stored in the :class:`EvaluationRepository` provided in the
__init__.
"""

previous_run_ids = []
previous_evaluation_ids = previous_evaluation_ids or []

for previous_evaluation_id in previous_evaluation_ids:
prev_run_ids: set[str] = set()
lineages = self.evaluation_lineages(previous_evaluation_id)
for lineage in lineages:
for output in lineage.outputs:
prev_run_ids.add(output.run_id)
previous_run_ids.append(prev_run_ids)

cast(
IncrementalEvaluationLogic[Input, Output, ExpectedOutput, Evaluation],
self._evaluation_logic,
).set_previous_run_output_ids(previous_run_ids)
return super().evaluate_runs(
*run_ids, num_examples=num_examples, abort_on_error=abort_on_error
)

def evaluate_runs(
self,
*run_ids: str,
num_examples: Optional[int] = None,
abort_on_error: bool = False,
) -> EvaluationOverview:
cast(
IncrementalEvaluationLogic[Input, Output, ExpectedOutput, Evaluation],
self._evaluation_logic,
).set_previous_run_output_ids([])
return super().evaluate_runs(
*run_ids, num_examples=num_examples, abort_on_error=abort_on_error
)
127 changes: 127 additions & 0 deletions tests/evaluation/test_diff_evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from pydantic import BaseModel

from intelligence_layer.core.task import Task
from intelligence_layer.core.tracer.tracer import Tracer
from intelligence_layer.evaluation.dataset.domain import Example
from intelligence_layer.evaluation.dataset.in_memory_dataset_repository import (
InMemoryDatasetRepository,
)
from intelligence_layer.evaluation.evaluation.evaluator import (
IncrementalEvaluationLogic,
IncrementalEvaluator,
)
from intelligence_layer.evaluation.evaluation.in_memory_evaluation_repository import (
InMemoryEvaluationRepository,
)
from intelligence_layer.evaluation.run.domain import SuccessfulExampleOutput
from intelligence_layer.evaluation.run.in_memory_run_repository import (
InMemoryRunRepository,
)
from intelligence_layer.evaluation.run.runner import Runner


class DummyEvaluation(BaseModel):
new_run_ids: list[str]
old_run_ids: list[list[str]]


class DummyIncrementalLogic(IncrementalEvaluationLogic[str, str, str, DummyEvaluation]):
def __init__(self) -> None:
super().__init__()

def do_incremental_evaluate(
self,
example: Example[str, str],
outputs: list[SuccessfulExampleOutput[str]],
already_evaluated_outputs: list[list[SuccessfulExampleOutput[str]]],
) -> DummyEvaluation:
return DummyEvaluation(
new_run_ids=[output.run_id for output in outputs],
old_run_ids=[
[output.run_id for output in evaluated_output]
for evaluated_output in already_evaluated_outputs
],
)


class DummyTask(Task[str, str]):
def __init__(self, info: str) -> None:
super().__init__()
self._info = info

def do_run(self, input: str, tracer: Tracer) -> str:
return f"{input} {self._info}"


def test_incremental_evaluator_should_filter_previous_run_ids() -> None:
# Given
examples = [Example(input="a", expected_output="0", id="id_0")]
dataset_repository = InMemoryDatasetRepository()
run_repository = InMemoryRunRepository()
evaluation_repository = InMemoryEvaluationRepository()
dataset = dataset_repository.create_dataset(
examples=examples, dataset_name="test_examples"
)

evaluator = IncrementalEvaluator(
dataset_repository=dataset_repository,
run_repository=run_repository,
evaluation_repository=evaluation_repository,
description="test_incremental_evaluator",
incremental_evaluation_logic=DummyIncrementalLogic(),
)

def create_run(name: str) -> str:
runner = Runner(
task=DummyTask(name),
dataset_repository=dataset_repository,
run_repository=run_repository,
description=f"Runner of {name}",
)
return runner.run_dataset(dataset.id).id

first_run_id = create_run("first")

first_evaluation_overview = evaluator.evaluate_additional_runs(first_run_id)

second_run_id = create_run("second")

second_evaluation_overview = evaluator.evaluate_additional_runs(
first_run_id,
second_run_id,
previous_evaluation_ids=[first_evaluation_overview.id],
)

second_result = next(
iter(evaluator.evaluation_lineages(second_evaluation_overview.id))
).evaluation.result
assert isinstance(second_result, DummyEvaluation)
assert second_result.new_run_ids == [second_run_id]
assert second_result.old_run_ids == [[first_run_id]]

independent_run_id = create_run("independent")

independent_evaluation_overview = evaluator.evaluate_additional_runs(
independent_run_id
)

third_run_id = create_run("third")

third_evaluation_overview = evaluator.evaluate_additional_runs(
first_run_id,
second_run_id,
independent_run_id,
third_run_id,
previous_evaluation_ids=[
second_evaluation_overview.id,
independent_evaluation_overview.id,
],
)

third_result = next(
iter(evaluator.evaluation_lineages(third_evaluation_overview.id))
).evaluation.result
assert isinstance(third_result, DummyEvaluation)
assert third_result.new_run_ids == [third_run_id]
assert sorted(third_result.old_run_ids[0]) == sorted([first_run_id, second_run_id])
assert sorted(third_result.old_run_ids[1]) == sorted([independent_run_id])

0 comments on commit 8900b7c

Please sign in to comment.