Skip to content

Commit

Permalink
feat: Add IncrementalEvaluator and IncrementalEvaluationLogic
Browse files Browse the repository at this point in the history
Task: IL-315
  • Loading branch information
FelixFehse authored and SebastianNiehusAA committed May 2, 2024
1 parent 9a58a4b commit b67d3e8
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 2 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
...

### New Features
...
- Add new `IncrementalEvaluator` for easier addition of runs to existing evaluations without repeated evaluation.
- Add `IncrementalEvaluationLogic` for use in `IncrementalEvaluator`

### Fixes
...
Expand Down
91 changes: 90 additions & 1 deletion src/intelligence_layer/evaluation/evaluation/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,39 @@ def do_evaluate(
pass


class IncrementalEvaluationLogic(
EvaluationLogic[Input, Output, ExpectedOutput, Evaluation]
):
def __init__(self) -> None:
super().__init__()
self._previous_run_output_ids: set[str] = set()

def set_previous_run_output_ids(self, previous_run_output_ids: set[str]) -> None:
self._previous_run_output_ids = previous_run_output_ids

def do_evaluate(
self,
example: Example[Input, ExpectedOutput],
*outputs: SuccessfulExampleOutput[Output],
) -> Evaluation:
evaluated_outputs = [
output
for output in outputs
if output.run_id in self._previous_run_output_ids
]
new_outputs = [output for output in outputs if output not in evaluated_outputs]
return self.do_incremental_evaluate(example, new_outputs, evaluated_outputs)

@abstractmethod
def do_incremental_evaluate(
self,
example: Example[Input, ExpectedOutput],
outputs: list[SuccessfulExampleOutput[Output]],
evaluated_outputs: list[SuccessfulExampleOutput[Output]],
) -> Evaluation:
pass


class SingleOutputEvaluationLogic(
EvaluationLogic[Input, Output, ExpectedOutput, Evaluation]
):
Expand Down Expand Up @@ -221,7 +254,6 @@ def evaluation_type(self) -> type[Evaluation]:
)
return cast(type[Evaluation], evaluation_type)

@final
def evaluate_runs(
self,
*run_ids: str,
Expand Down Expand Up @@ -449,3 +481,60 @@ def evaluation_lineage(
output_type=self.output_type(),
evaluation_type=self.evaluation_type(),
)


class IncrementalEvaluator(Evaluator[Input, Output, ExpectedOutput, Evaluation]):
def __init__(
self,
dataset_repository: DatasetRepository,
run_repository: RunRepository,
evaluation_repository: EvaluationRepository,
description: str,
incremental_evaluation_logic: IncrementalEvaluationLogic[
Input, Output, ExpectedOutput, Evaluation
],
) -> None:
super().__init__(
dataset_repository=dataset_repository,
run_repository=run_repository,
evaluation_repository=evaluation_repository,
description=description,
evaluation_logic=incremental_evaluation_logic,
)

def evaluate_additional_runs(
self,
*run_ids: str,
previous_evaluation_id: Optional[str] = None,
num_examples: Optional[int] = None,
abort_on_error: bool = False,
) -> EvaluationOverview:
previous_run_ids = set()

if previous_evaluation_id is not None:
lineages = self.evaluation_lineages(previous_evaluation_id)
for lineage in lineages:
for output in lineage.outputs:
previous_run_ids.add(output.run_id)

cast(
IncrementalEvaluationLogic[Input, Output, ExpectedOutput, Evaluation],
self._evaluation_logic,
).set_previous_run_output_ids(previous_run_ids)
return super().evaluate_runs(
*run_ids, num_examples=num_examples, abort_on_error=abort_on_error
)

def evaluate_runs(
self,
*run_ids: str,
num_examples: Optional[int] = None,
abort_on_error: bool = False,
) -> EvaluationOverview:
cast(
IncrementalEvaluationLogic[Input, Output, ExpectedOutput, Evaluation],
self._evaluation_logic,
).set_previous_run_output_ids(set())
return super().evaluate_runs(
*run_ids, num_examples=num_examples, abort_on_error=abort_on_error
)
100 changes: 100 additions & 0 deletions tests/evaluation/test_diff_evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from pydantic import BaseModel

from intelligence_layer.core.task import Task
from intelligence_layer.core.tracer.tracer import Tracer
from intelligence_layer.evaluation.dataset.domain import Example
from intelligence_layer.evaluation.dataset.in_memory_dataset_repository import (
InMemoryDatasetRepository,
)
from intelligence_layer.evaluation.evaluation.evaluator import (
IncrementalEvaluationLogic,
IncrementalEvaluator,
)
from intelligence_layer.evaluation.evaluation.in_memory_evaluation_repository import (
InMemoryEvaluationRepository,
)
from intelligence_layer.evaluation.run.domain import SuccessfulExampleOutput
from intelligence_layer.evaluation.run.in_memory_run_repository import (
InMemoryRunRepository,
)
from intelligence_layer.evaluation.run.runner import Runner


class DummyEvaluation(BaseModel):
new_run_ids: list[str]
old_run_ids: list[str]


class DummyIncrementalLogic(IncrementalEvaluationLogic[str, str, str, DummyEvaluation]):
def __init__(self) -> None:
super().__init__()

def do_incremental_evaluate(
self,
example: Example[str, str],
outputs: list[SuccessfulExampleOutput[str]],
evaluated_outputs: list[SuccessfulExampleOutput[str]],
) -> DummyEvaluation:
return DummyEvaluation(
new_run_ids=[output.run_id for output in outputs],
old_run_ids=[output.run_id for output in evaluated_outputs],
)


class DummyTask(Task[str, str]):
def __init__(self, info: str) -> None:
super().__init__()
self._info = info

def do_run(self, input: str, tracer: Tracer) -> str:
return f"{input} {self._info}"


def test_incremental_evaluator_should_filter_previous_run_ids() -> None:
# Given
examples = [Example(input="a", expected_output="0", id="id_0")]

dataset_repository = InMemoryDatasetRepository()
dataset = dataset_repository.create_dataset(
examples=examples, dataset_name="test_examples"
)

run_repository = InMemoryRunRepository()
old_runner = Runner(
task=DummyTask("Task0"),
dataset_repository=dataset_repository,
run_repository=run_repository,
description="test_runner_0",
)
old_run = old_runner.run_dataset(dataset.id)

evaluation_repository = InMemoryEvaluationRepository()
evaluator = IncrementalEvaluator(
dataset_repository=dataset_repository,
run_repository=run_repository,
evaluation_repository=evaluation_repository,
description="test_incremental_evaluator",
incremental_evaluation_logic=DummyIncrementalLogic(),
)
evaluation_overview = evaluator.evaluate_additional_runs(old_run.id)

new_runner = Runner(
task=DummyTask("Task2"),
dataset_repository=dataset_repository,
run_repository=run_repository,
description="test_runner_2",
)
new_run = new_runner.run_dataset(dataset.id)

# When
new_evaluation_overview = evaluator.evaluate_additional_runs(
old_run.id, new_run.id, previous_evaluation_id=evaluation_overview.id
)

# Then
result = next(
iter(evaluator.evaluation_lineages(new_evaluation_overview.id))
).evaluation.result
assert isinstance(result, DummyEvaluation)
assert result.new_run_ids == [new_run.id]
assert result.old_run_ids == [old_run.id]

0 comments on commit b67d3e8

Please sign in to comment.