From cb8843c3d71b5d0090adecc914d043d61e5fa351 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Niklas=20K=C3=B6hnecke?= <155443293+NiklasKoehneckeAA@users.noreply.github.com> Date: Tue, 14 May 2024 16:53:36 +0200 Subject: [PATCH] feat: improved EvaluationOverviews and Argilla Integration (#829) * create EvaluationLogicBase to fix type magic for async case * create EvaluatorBase to share common evaluation behavior * remove ArgillaEvaluationRepository as it is no longer needed * Implement an AsyncEvaluator that serves as the base for the ArgillaEvaluator * Refactor Argilla* dependent classes * Adjust tutorials accordingly Task: IL-298 --------- Co-authored-by: Merlin Kallenborn Co-authored-by: Johannes Wesch --- CHANGELOG.md | 26 +- .../how_tos/how_to_evaluate_runs.ipynb | 8 +- .../how_to_human_evaluation_via_argilla.ipynb | 194 ++---- ...ple_evaluation_and_aggregation_logic.ipynb | 6 +- src/documentation/human_evaluation.ipynb | 148 +++-- .../connectors/argilla/argilla_client.py | 4 +- src/intelligence_layer/evaluation/__init__.py | 54 +- .../evaluation/aggregation/aggregator.py | 4 +- .../aggregation/argilla_aggregator.py | 117 ---- .../evaluation/aggregation/elo.py | 68 +- .../argilla_evaluation_repository.py | 166 ----- .../evaluation/argilla_evaluator.py | 190 ------ .../evaluation/evaluation/domain.py | 59 +- .../evaluation/evaluation/evaluator.py | 606 ------------------ .../evaluation/evaluator/argilla_evaluator.py | 311 +++++++++ .../evaluation/evaluator/async_evaluator.py | 108 ++++ .../evaluation/evaluator/base_evaluator.py | 376 +++++++++++ .../evaluation/evaluator/evaluator.py | 190 ++++++ .../evaluator/incremental_evaluator.py | 172 +++++ .../evaluation/file_evaluation_repository.py | 40 +- .../in_memory_evaluation_repository.py | 29 + .../evaluation/run_evaluation.py | 2 +- tests/conftest.py | 25 +- tests/evaluation/conftest.py | 17 +- .../test_argilla_evaluation_repository.py | 305 --------- tests/evaluation/test_argilla_evaluator.py | 333 ++++++---- .../test_async_evaluation_repository.py | 186 ++++++ tests/evaluation/test_dataset_repository.py | 4 +- tests/evaluation/test_diff_evaluator.py | 19 +- .../evaluation/test_evaluation_repository.py | 37 +- tests/evaluation/test_evaluator.py | 25 +- ...t_instruct_comparison_argilla_evaluator.py | 114 ++-- 32 files changed, 2096 insertions(+), 1847 deletions(-) delete mode 100644 src/intelligence_layer/evaluation/aggregation/argilla_aggregator.py delete mode 100644 src/intelligence_layer/evaluation/evaluation/argilla_evaluation_repository.py delete mode 100644 src/intelligence_layer/evaluation/evaluation/argilla_evaluator.py delete mode 100644 src/intelligence_layer/evaluation/evaluation/evaluator.py create mode 100644 src/intelligence_layer/evaluation/evaluation/evaluator/argilla_evaluator.py create mode 100644 src/intelligence_layer/evaluation/evaluation/evaluator/async_evaluator.py create mode 100644 src/intelligence_layer/evaluation/evaluation/evaluator/base_evaluator.py create mode 100644 src/intelligence_layer/evaluation/evaluation/evaluator/evaluator.py create mode 100644 src/intelligence_layer/evaluation/evaluation/evaluator/incremental_evaluator.py delete mode 100644 tests/evaluation/test_argilla_evaluation_repository.py create mode 100644 tests/evaluation/test_async_evaluation_repository.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 394959d95..60e88b254 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,11 +2,33 @@ ## Unreleased +We did a major revamp of the `ArgillaEvaluator` to separate an `AsyncEvaluator` from the normal evaluation scenario. +This comes with easier to understand interfaces, more information in the `EvaluationOverview` and a simplified aggregation step for Argilla that is no longer dependent on specific Argilla types. +Check the how-to for detailed information [here](./src/documentation/how_tos/how_to_human_evaluation_via_argilla.ipynb) + ### Breaking Changes -... + +- rename: `AggregatedInstructComparison` to `AggregatedComparison` +- rename `InstructComparisonArgillaAggregationLogic` to `ComparisonAggregationLogic` +- remove: `ArgillaAggregator` - the regular aggregator now does the job +- remove: `ArgillaEvaluationRepository` - `ArgillaEvaluator` now uses `AsyncRepository` which extend existing `EvaluationRepository` for the human-feedback use-case +- `ArgillaEvaluationLogic` now uses `to_record` and `from_record` instead of `do_evaluate`. The signature of the `to_record` stays the same. The `Field` and `Question` are now defined in the logic instead of passed to the `ArgillaRepository` +- `ArgillaEvaluator` now takes the `ArgillaClient` as well as the `workspace_id`. It inherits from the abstract `AsyncEvaluator` and no longer has `evalaute_runs` and `evaluate`. Instead it has `submit` and `retrieve`. +- `EvaluationOverview` gets attributes `end_date`, `successful_evaluation_count` and `failed_evaluation_count` + - rename: `start` is now called `start_date` and no longer optional +- we refactored the internals of `Evaluator`. This is only relevant if you subclass from it. Most of the typing and data handling is moved to `EvaluatorBase` + ### New Features -... +- Add `ComparisonEvaluation` for the elo evaluation to abstract from the Argilla record +- Add `AsyncEvaluator` for human-feedback evaluation. `ArgillaEvaluator` inherits from this + - `.submit` pushes all evaluations to Argilla to label them + - Add `PartialEvaluationOverview` to store the submission details. + - `.retrieve` then collects all labelled records from Argilla and stores them in an `AsyncRepository`. + - Add `AsyncEvaluationRepository` to store and retrieve `PartialEvaluationOverview`. Also added `AsyncFileEvaluationRepository` and `AsyncInMemoryEvaluationRepository` +- Add `EvaluatorBase` and `EvaluationLogicBase` for base classes for both async and synchronous evaluation. + + ### Fixes - Improve description of using artifactory tokens for installation of IL diff --git a/src/documentation/how_tos/how_to_evaluate_runs.ipynb b/src/documentation/how_tos/how_to_evaluate_runs.ipynb index 403ebcb1a..91d05c023 100644 --- a/src/documentation/how_tos/how_to_evaluate_runs.ipynb +++ b/src/documentation/how_tos/how_to_evaluate_runs.ipynb @@ -8,10 +8,7 @@ "source": [ "from example_data import DummyEvaluationLogic, example_data\n", "\n", - "from intelligence_layer.evaluation.evaluation.evaluator import Evaluator\n", - "from intelligence_layer.evaluation.evaluation.in_memory_evaluation_repository import (\n", - " InMemoryEvaluationRepository,\n", - ")" + "from intelligence_layer.evaluation import Evaluator, InMemoryEvaluationRepository" ] }, { @@ -40,6 +37,7 @@ "outputs": [], "source": [ "# Step 0\n", + "\n", "my_example_data = example_data()\n", "print()\n", "run_ids = [my_example_data.run_overview_1.id, my_example_data.run_overview_2.id]\n", @@ -82,7 +80,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" + "version": "3.11.8" } }, "nbformat": 4, diff --git a/src/documentation/how_tos/how_to_human_evaluation_via_argilla.ipynb b/src/documentation/how_tos/how_to_human_evaluation_via_argilla.ipynb index b7de9e085..f032788a5 100644 --- a/src/documentation/how_tos/how_to_human_evaluation_via_argilla.ipynb +++ b/src/documentation/how_tos/how_to_human_evaluation_via_argilla.ipynb @@ -6,8 +6,6 @@ "metadata": {}, "outputs": [], "source": [ - "from typing import Iterable\n", - "\n", "from dotenv import load_dotenv\n", "from pydantic import BaseModel\n", "\n", @@ -19,13 +17,9 @@ " RecordData,\n", ")\n", "from intelligence_layer.evaluation import (\n", - " AggregationLogic,\n", - " ArgillaAggregator,\n", " ArgillaEvaluationLogic,\n", - " ArgillaEvaluationRepository,\n", + " AsyncInMemoryEvaluationRepository,\n", " Example,\n", - " InMemoryAggregationRepository,\n", - " InMemoryEvaluationRepository,\n", " RecordDataSequence,\n", " SuccessfulExampleOutput,\n", ")\n", @@ -40,13 +34,17 @@ "# How to evaluate with human evaluation via Argilla\n", "1. Initialize an Argilla client with the correct settings for your setup\n", " - By default, the url and api key are read from the environment variables `ARGILLA_API_URL` and `ARGILLA_API_KEY`\n", - "2. Create `Question`s and `Field`s to structure the data that will be displayed in Argilla\n", - "3. Choose an Argilla workspace and get its ID\n", - "4. Create an `ArgillaEvaluationRepository`\n", + "2. Choose an Argilla workspace and get its ID\n", + "3. Create an `AsyncEvaluationRepository`\n", + "4. Define new output type for the evaluation\n", "5. Implement an `ArgillaEvaluationLogic`\n", + " 1. Create `Question`s and `Field`s to structure the data that will be displayed in Argilla\n", + " 2. Implement `to_record` to convert the task input into an Argilla record\n", + " 3. Implement `from_record` to convert the record back to an evaluation result\n", "6. Submit tasks to the Argilla instance by running the `ArgillaEvaluator`\n", - " - Make sure to save the `EvaluationOverview.id`, as it is needed to retrieve the results later\n", - "7. **Use the Argilla web platform to evaluate** " + "7. **Use the Argilla web platform to evaluate** \n", + "8. Collect all labelled evaluations from Argilla\n", + " - Make sure to save the `EvaluationOverview.id`, as it is needed to retrieve the results later" ] }, { @@ -62,54 +60,66 @@ "metadata": {}, "outputs": [], "source": [ + "# Step 0\n", + "\n", + "\n", + "class StoryTaskInput(BaseModel): # Should already be implemented in your task\n", + " topic: str\n", + " targeted_word_count: int\n", + "\n", + "\n", + "class StoryTaskOutput(BaseModel): # Should already be implemented in your task\n", + " story: str\n", + "\n", + "\n", "# Step 1\n", + "\n", + "\n", "client = DefaultArgillaClient(\n", " # api_url=\"your url here\", # not necessary if ARGILLA_API_URL is set in environment\n", " # api_key=\"your api key here\", # not necessary if ARGILLA_API_KEY is set in environment\n", ")\n", "\n", - "# Step 2\n", - "questions = [\n", - " Question(\n", - " name=\"rating\",\n", - " title=\"Funniness\",\n", - " description=\"How funny do you think is the joke? Rate it from 1-5.\",\n", - " options=range(1, 6),\n", - " )\n", - "]\n", - "fields = [\n", - " Field(name=\"input\", title=\"Topic\"),\n", - " Field(name=\"output\", title=\"Joke\"),\n", - "]\n", "\n", - "# Step 3\n", + "# Step 2\n", "workspace_id = client.ensure_workspace_exists(\"my-workspace-name\")\n", "\n", - "# Step 4\n", - "data_storage = (\n", - " InMemoryEvaluationRepository()\n", + "# Step 3\n", + "evaluation_repository = (\n", + " AsyncInMemoryEvaluationRepository()\n", ") # Use FileEvaluationRepository for persistent results\n", - "evaluation_repository = ArgillaEvaluationRepository(\n", - " data_storage, client, workspace_id, fields, questions\n", - ")\n", "\n", "\n", - "# Step 5\n", - "class StoryTaskInput(BaseModel): # Should already be implemented in your task\n", - " topic: str\n", - " targeted_word_count: int\n", - "\n", - "\n", - "class StoryTaskOutput(BaseModel): # Should already be implemented in your task\n", - " story: str\n", + "# Step 4\n", + "class FunnyOutputRating(BaseModel):\n", + " rating: int\n", "\n", "\n", + "# Step 5\n", "class CustomArgillaEvaluationLogic(\n", " ArgillaEvaluationLogic[\n", - " StoryTaskInput, StoryTaskOutput, None\n", + " StoryTaskInput, StoryTaskOutput, None, FunnyOutputRating\n", " ] # No expected output, therefore \"None\"\n", "):\n", - " def _to_record(\n", + " # Step 5.1\n", + " def __init__(self):\n", + " super().__init__(\n", + " questions=[\n", + " Question(\n", + " name=\"rating\",\n", + " title=\"Funniness\",\n", + " description=\"How funny do you think is the joke? Rate it from 1-5.\",\n", + " options=range(1, 6),\n", + " )\n", + " ],\n", + " fields=[\n", + " Field(name=\"input\", title=\"Topic\"),\n", + " Field(name=\"output\", title=\"Joke\"),\n", + " ],\n", + " )\n", + "\n", + " # Step 5.2\n", + " def to_record(\n", " self,\n", " example: Example[StoryTaskInput, None],\n", " *output: SuccessfulExampleOutput[StoryTaskOutput],\n", @@ -128,6 +138,10 @@ " ]\n", " )\n", "\n", + " # Step 5.3\n", + " def from_record(self, argilla_evaluation: ArgillaEvaluation) -> FunnyOutputRating:\n", + " return FunnyOutputRating(rating=argilla_evaluation.metadata[\"rating\"])\n", + "\n", "\n", "evaluation_logic = CustomArgillaEvaluationLogic()" ] @@ -145,16 +159,25 @@ "runs_to_evaluate = [\"your_run_id_of_interest\", \"other_run_id_of_interest\"]\n", "\n", "evaluator = ArgillaEvaluator(\n", - " ..., evaluation_repository, description=\"My evaluation description\", evaluation_logic=evaluation_logic\n", + " ...,\n", + " evaluation_repository,\n", + " description=\"My evaluation description\",\n", + " evaluation_logic=evaluation_logic,\n", + " argilla_client=client,\n", + " workspace_id=workspace_id,\n", ")\n", - "evaluation_overview = evaluator.evaluate_runs(*runs_to_evaluate)\n", - "print(\"ID to retrieve results later: \", evaluation_overview.id)\n", + "partial_evaluation_overview = evaluator.submit(*runs_to_evaluate)\n", + "print(\"ID to retrieve results later: \", partial_evaluation_overview.id)\n", "\n", "# Step 7\n", "\n", "####################################\n", "# Evaluate via the Argilla UI here #\n", - "####################################" + "####################################\n", + "\n", + "# Step 8\n", + "\n", + "evaluation_overview = evaluator.retrieve(partial_evaluation_overview.id)" ] }, { @@ -165,83 +188,6 @@ "```python\n", "```" ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# How to aggregate an Argilla evaluation\n", - "0. Submit tasks to Argilla and perform an evaluation (see [here](#how-to-evaluate-with-human-evaluation-via-argilla)).\n", - "1. Implement an `AggregationLogic` that takes `ArgillaEvaluation`s as input.\n", - "2. Remember the ID of the evaluation and the name of the Argilla workspace that you want to aggregate.\n", - "3. Initialize the `ArgillaEvaluationRepository` and an aggregation repository.\n", - "4. Aggregate the results with an `ArgillaAggregator`." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Step 1\n", - "\n", - "\n", - "class CustomArgillaAggregation(BaseModel):\n", - " avg_funniness: float\n", - "\n", - "\n", - "class CustomArgillaAggregationLogic(\n", - " AggregationLogic[ArgillaEvaluation, CustomArgillaAggregation]\n", - "):\n", - " def aggregate(\n", - " self, evaluations: Iterable[ArgillaEvaluation]\n", - " ) -> CustomArgillaAggregation:\n", - " evaluation_list = list(evaluations)\n", - " total_score = sum(\n", - " evaluation.metadata[\n", - " \"rating\"\n", - " ] # This name is defined by the `Question`s given to the Argilla repository during submission\n", - " for evaluation in evaluation_list\n", - " )\n", - " return CustomArgillaAggregation(\n", - " avg_funniness=total_score / len(evaluation_list)\n", - " )\n", - "\n", - "\n", - "aggregation_logic = CustomArgillaAggregationLogic()\n", - "\n", - "# Step 2 - See the first example for more info\n", - "eval_id = \"my-previous-eval-id\"\n", - "client = DefaultArgillaClient()\n", - "workspace_id = client.ensure_workspace_exists(\"my-workspace-name\")\n", - "\n", - "# Step 3\n", - "evaluation_repository = ArgillaEvaluationRepository(\n", - " InMemoryEvaluationRepository(), client, workspace_id\n", - ")\n", - "aggregation_repository = InMemoryAggregationRepository()\n", - "\n", - "# Step 4\n", - "aggregator = ArgillaAggregator(\n", - " evaluation_repository,\n", - " aggregation_repository,\n", - " \"My aggregation description\",\n", - " aggregation_logic,\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "%%script false --no-raise-error\n", - "# we skip this as we do not have a dataset or run in this example\n", - "\n", - "aggregation = aggregator.aggregate_evaluation(eval_id)" - ] } ], "metadata": { @@ -260,7 +206,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" + "version": "3.11.8" } }, "nbformat": 4, diff --git a/src/documentation/how_tos/how_to_implement_a_simple_evaluation_and_aggregation_logic.ipynb b/src/documentation/how_tos/how_to_implement_a_simple_evaluation_and_aggregation_logic.ipynb index 2d90e7781..a465cf235 100644 --- a/src/documentation/how_tos/how_to_implement_a_simple_evaluation_and_aggregation_logic.ipynb +++ b/src/documentation/how_tos/how_to_implement_a_simple_evaluation_and_aggregation_logic.ipynb @@ -12,9 +12,9 @@ "from dotenv import load_dotenv\n", "from pydantic import BaseModel\n", "\n", - "from intelligence_layer.evaluation.aggregation.aggregator import AggregationLogic\n", - "from intelligence_layer.evaluation.dataset.domain import Example\n", - "from intelligence_layer.evaluation.evaluation.evaluator import (\n", + "from intelligence_layer.evaluation import (\n", + " AggregationLogic,\n", + " Example,\n", " SingleOutputEvaluationLogic,\n", ")\n", "\n", diff --git a/src/documentation/human_evaluation.ipynb b/src/documentation/human_evaluation.ipynb index 486a1c56b..992e98fed 100644 --- a/src/documentation/human_evaluation.ipynb +++ b/src/documentation/human_evaluation.ipynb @@ -8,7 +8,7 @@ "source": [ "import shutil\n", "from pathlib import Path\n", - "from typing import Iterable, cast\n", + "from typing import Iterable\n", "\n", "from datasets import load_dataset\n", "from dotenv import load_dotenv\n", @@ -30,14 +30,13 @@ ")\n", "from intelligence_layer.evaluation import (\n", " AggregationLogic,\n", - " ArgillaAggregator,\n", + " Aggregator,\n", " ArgillaEvaluationLogic,\n", - " ArgillaEvaluationRepository,\n", " ArgillaEvaluator,\n", + " AsyncFileEvaluationRepository,\n", " Example,\n", " FileAggregationRepository,\n", " FileDatasetRepository,\n", - " FileEvaluationRepository,\n", " FileRunRepository,\n", " RecordDataSequence,\n", " Runner,\n", @@ -48,6 +47,7 @@ "\n", "client = LimitedConcurrencyClient.from_env()\n", "\n", + "\n", "REPOSITORY_ROOT_PATH = Path(\"human-eval-data\")" ] }, @@ -239,7 +239,9 @@ "metadata": {}, "source": [ "At the end of our evaluation we want a float score $s \\in [1,5]$ describing the model performance.\n", - "We define this as an `InstructAggregatedEvaluation`, which will be used in our aggregation later." + "We define this as an `InstructAggregatedEvaluation`, which will be used in our aggregation later.\n", + "\n", + "We also define the `InstructEvaluation`, which represents an evaluation of a single entry, which we will aggregate later." ] }, { @@ -251,7 +253,12 @@ "class InstructAggregatedEvaluation(BaseModel):\n", " general_rating: float | None\n", " fluency: float | None\n", - " evaluated_examples: int" + " evaluated_examples: int\n", + "\n", + "\n", + "class InstructEvaluation(BaseModel):\n", + " general_rating: float\n", + " fluency: float" ] }, { @@ -259,8 +266,9 @@ "metadata": {}, "source": [ "We can now start to define our human evaluation. This is done with `Questions` and `Fields`. \n", - "`Fields` define what a user has to evaluate. In our example, this will be the model input (Instruction) and output (Model Completion). Note that the field names have to match the content keys from the `RecordData` which we will define later in our `InstructArgillaEvaluationLogic`. \n", - "`Questions` are what a user has to answer in order to evaluate the `Fields`. The `name` property will later be used to access the human ratings in the aggregation step. In our case we ask how complete and how fluent the completions are." + "`Fields` define what a user has to evaluate. In our example, this will be the model input (Instruction) and output (Model Completion). \n", + "`Questions` are what a user has to answer in order to evaluate the `Fields`. The `name` property will later be used to access the human ratings. \n", + "Both of these are passed to the `ArgillaEvaluationLogic` to create `RecordData` to convert data back and forth from Argilla. " ] }, { @@ -284,10 +292,10 @@ " ),\n", "]\n", "\n", - "fields = [\n", - " Field(name=\"input\", title=\"Instruction\"),\n", - " Field(name=\"output\", title=\"Model Completion\"),\n", - "]" + "fields = {\n", + " \"input\": Field(name=\"input\", title=\"Instruction\"),\n", + " \"output\": Field(name=\"output\", title=\"Model Completion\"),\n", + "}" ] }, { @@ -297,8 +305,8 @@ "Our defined fields and questions will look like this:\n", "![Argilla Interface](../../assets/argilla_interface.png)\n", "\n", - "We can now define our `InstructArgillaEvaluationLogic` and `InstructArgillaAggregationLogic`.\n", - "They have to implement the two abstract methods `_to_record` and `aggregate` respectively.\n", + "We can now define our `InstructArgillaEvaluationLogic` to translate our data to specific Argilla formats .\n", + "The logic has to implement the two abstract methods `to_record` and `from_record`.\n", "Lets look at the documentation:" ] }, @@ -308,16 +316,16 @@ "metadata": {}, "outputs": [], "source": [ - "help(ArgillaEvaluationLogic._to_record)\n", + "help(ArgillaEvaluationLogic.to_record)\n", "print(\"-\" * 100)\n", - "help(AggregationLogic.aggregate)" + "help(ArgillaEvaluationLogic.from_record)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Instead of performing the evaluation, the `ArgillaEvaluationLogic` is responsible for converting the evaluation data to a format that is accepted by Argilla. During the evaluation, these records will simply be submitted to Argilla. \n", + "Instead of performing the evaluation, the `ArgillaEvaluationLogic` is responsible for converting the evaluation data to a format that is accepted by Argilla. During the evaluation, these records will simply be submitted to Argilla and retrieved later.\n", "We will now create everything we need to submit these evaluations to our Argilla instance." ] }, @@ -332,9 +340,10 @@ " InstructInput,\n", " CompleteOutput,\n", " None,\n", + " InstructEvaluation,\n", " ]\n", "):\n", - " def _to_record(\n", + " def to_record(\n", " self,\n", " example: Example[InstructInput, None],\n", " *example_outputs: SuccessfulExampleOutput[CompleteOutput],\n", @@ -343,34 +352,39 @@ " records=[\n", " RecordData(\n", " content={\n", - " \"input\": example.input.instruction,\n", - " \"output\": example_outputs[0].output.completion,\n", + " self.fields[\"input\"].name: example.input.instruction,\n", + " self.fields[\"output\"].name: example_outputs[\n", + " 0\n", + " ].output.completion,\n", " },\n", " example_id=example.id,\n", " )\n", " ]\n", " )\n", "\n", + " def from_record(self, argilla_evaluation: ArgillaEvaluation) -> InstructEvaluation:\n", + " return InstructEvaluation(\n", + " general_rating=argilla_evaluation.responses[\"general_rating\"],\n", + " fluency=argilla_evaluation.responses[\"fluency\"],\n", + " )\n", + "\n", "\n", "argilla_client = DefaultArgillaClient()\n", "workspace_id = argilla_client.ensure_workspace_exists(\"test-human-eval\")\n", "\n", "dataset_repository = FileDatasetRepository(REPOSITORY_ROOT_PATH)\n", "run_repository = FileRunRepository(REPOSITORY_ROOT_PATH)\n", - "evaluation_repository = FileEvaluationRepository(\n", - " REPOSITORY_ROOT_PATH\n", - ") # this is only used to store failed evaluations and the evaluation overview\n", - "argilla_evaluation_repository = ArgillaEvaluationRepository(\n", - " evaluation_repository, argilla_client, workspace_id, fields, questions\n", - ")\n", + "evaluation_repository = AsyncFileEvaluationRepository(REPOSITORY_ROOT_PATH)\n", "\n", - "eval_logic = InstructArgillaEvaluationLogic()\n", + "eval_logic = InstructArgillaEvaluationLogic(fields, questions)\n", "evaluator = ArgillaEvaluator(\n", " dataset_repository,\n", " run_repository,\n", - " argilla_evaluation_repository,\n", + " evaluation_repository,\n", " \"instruct-evaluation\",\n", " eval_logic,\n", + " argilla_client=argilla_client,\n", + " workspace_id=workspace_id,\n", ")" ] }, @@ -378,7 +392,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "After setting up the `ArgillaEvaluator`, the `evaluate_runs` methods posts the records to the Argilla instance." + "After setting up the `ArgillaEvaluator`, the `sumit` methods posts the records to the Argilla instance." ] }, { @@ -394,22 +408,17 @@ " if overview.description == \"instruct-run\"\n", "][0]\n", "\n", - "try:\n", - " eval_overview = evaluator.evaluate_runs(run_id)\n", - " print(eval_overview)\n", "\n", - "except Exception as e:\n", - " eval_overview = None\n", - " print(str(e))" + "partial_eval_overview = evaluator.submit(run_id)\n", + "print(partial_eval_overview)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "While the evaluation says that 5 examples were successfully evaluated, no real evaluation has happened yet. \n", "If we try to perform an aggregation right now, it will have no evaluations, as none of the submitted records were evaluated by humans through Argilla yet. \n", - "The aggregation fetches only the results that were already evaluated.\n", + "The next steps fetches only results that have been evaluated already\n", "\n", "---\n", "\n", @@ -423,7 +432,7 @@ "metadata": {}, "outputs": [], "source": [ - "eval_id = eval_overview.id\n", + "eval_id = partial_eval_overview.id\n", "argilla_client.split_dataset(eval_id, n_splits=3)" ] }, @@ -432,17 +441,39 @@ "metadata": {}, "source": [ "These splits can then be filered by, as shown below. \n", - "\"drawing\"" + "\"drawing\"\n", + "\n", + "To finish the evaluation, we can retrieve the evaluated examples as follows:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "evaluation_repository = AsyncFileEvaluationRepository(REPOSITORY_ROOT_PATH)\n", + "\n", + "# either remember the id from before (eval_overview.id) or retrieve as below\n", + "eval_id = [\n", + " overview.id\n", + " for overview in evaluation_repository.partial_evaluation_overviews()\n", + " if overview.description == \"instruct-evaluation\"\n", + "][0]\n", + "\n", + "evaluation_overview = evaluator.retrieve(eval_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ + "\n", + "Note that all examples that are not yet evaluated in argilla are noted as `failed_examples` and not passed to the next step.\n", "\n", "---\n", "\n", - "For the Aggregation, we first need to define our `AggregationLogic` that has to take an `ArgillaEvaluation` as an input. As output, we use the `InstructAggregatedEvaluation` we defined earlier." + "For the Aggregation, we first need to define our `AggregationLogic` that takes our previously defined types as input and output. Here, we use `InstructEvaluation` and `InstructAggregatedEvaluation`." ] }, { @@ -452,11 +483,11 @@ "outputs": [], "source": [ "class InstructArgillaAggregationLogic(\n", - " AggregationLogic[ArgillaEvaluation, InstructAggregatedEvaluation]\n", + " AggregationLogic[InstructEvaluation, InstructAggregatedEvaluation]\n", "):\n", " def aggregate(\n", " self,\n", - " evaluations: Iterable[ArgillaEvaluation],\n", + " evaluations: Iterable[InstructEvaluation],\n", " ) -> InstructAggregatedEvaluation:\n", " evaluations = list(evaluations)\n", "\n", @@ -468,13 +499,12 @@ " )\n", "\n", " general_rating = sum(\n", - " cast(float, evaluation.responses[\"general_rating\"])\n", - " for evaluation in evaluations\n", + " evaluation.general_rating for evaluation in evaluations\n", " ) / len(evaluations)\n", "\n", - " fluency = sum(\n", - " cast(float, evaluation.responses[\"fluency\"]) for evaluation in evaluations\n", - " ) / len(evaluations)\n", + " fluency = sum(evaluation.fluency for evaluation in evaluations) / len(\n", + " evaluations\n", + " )\n", "\n", " return InstructAggregatedEvaluation(\n", " general_rating=general_rating,\n", @@ -490,7 +520,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "With this, we can define our `ArgillaAggregator` and retrieve the aggregation of all records that have been evaluated." + "With this, we can define our `Aggregator` and aggregate all evaluations. This step is the same as non-human evaluation." ] }, { @@ -499,31 +529,17 @@ "metadata": {}, "outputs": [], "source": [ - "evaluation_repository = FileEvaluationRepository(REPOSITORY_ROOT_PATH)\n", - "argilla_evaluation_repository = ArgillaEvaluationRepository(\n", - " evaluation_repository,\n", - " argilla_client,\n", - " workspace_id, # we do not need to set questions and fields here\n", - ")\n", "aggregation_repository = FileAggregationRepository(REPOSITORY_ROOT_PATH)\n", - "# either remember the id from before (eval_overview.id) or retrieve as below\n", - "eval_id = [\n", - " overview.id\n", - " for overview in argilla_evaluation_repository.evaluation_overviews()\n", - " if overview.description == \"instruct-evaluation\"\n", - "][0]\n", "\n", - "\n", - "aggregator = ArgillaAggregator(\n", - " argilla_evaluation_repository,\n", + "aggregator = Aggregator(\n", + " evaluation_repository,\n", " aggregation_repository,\n", " \"instruct-aggregation\",\n", " aggregation_logic,\n", ")\n", "\n", - "if eval_overview:\n", - " output = aggregator.aggregate_evaluation(eval_id)\n", - " print(output.statistics)" + "output = aggregator.aggregate_evaluation(eval_id)\n", + "print(output.statistics)" ] }, { diff --git a/src/intelligence_layer/connectors/argilla/argilla_client.py b/src/intelligence_layer/connectors/argilla/argilla_client.py index 7ff167d22..d0a8f5617 100644 --- a/src/intelligence_layer/connectors/argilla/argilla_client.py +++ b/src/intelligence_layer/connectors/argilla/argilla_client.py @@ -129,12 +129,12 @@ def add_record(self, dataset_id: str, record: RecordData) -> None: @abstractmethod def evaluations(self, dataset_id: str) -> Iterable[ArgillaEvaluation]: - """Returns all submitted evaluations for the given dataset. + """Returns all human-evaluated evaluations for the given dataset. Args: dataset_id: the id of the dataset. Returns: - An `Iterable` over all submitted evaluations for the given dataset. + An `Iterable` over all human-evaluated evaluations for the given dataset. """ ... diff --git a/src/intelligence_layer/evaluation/__init__.py b/src/intelligence_layer/evaluation/__init__.py index 2b978abba..a16f4d8df 100644 --- a/src/intelligence_layer/evaluation/__init__.py +++ b/src/intelligence_layer/evaluation/__init__.py @@ -4,16 +4,10 @@ ) from .aggregation.aggregator import AggregationLogic as AggregationLogic from .aggregation.aggregator import Aggregator as Aggregator -from .aggregation.argilla_aggregator import ( - AggregatedInstructComparison as AggregatedInstructComparison, -) -from .aggregation.argilla_aggregator import ArgillaAggregator as ArgillaAggregator -from .aggregation.argilla_aggregator import ( - InstructComparisonArgillaAggregationLogic as InstructComparisonArgillaAggregationLogic, -) -from .aggregation.argilla_aggregator import PlayerScore as PlayerScore from .aggregation.domain import AggregatedEvaluation as AggregatedEvaluation from .aggregation.domain import AggregationOverview as AggregationOverview +from .aggregation.elo import ComparisonAggregationLogic as ComparisonAggregationLogic +from .aggregation.elo import ComparisonEvaluation as ComparisonEvaluation from .aggregation.elo import EloCalculator as EloCalculator from .aggregation.elo import MatchOutcome as MatchOutcome from .aggregation.elo import WinRateCalculator as WinRateCalculator @@ -45,19 +39,6 @@ from .dataset.single_huggingface_dataset_repository import ( SingleHuggingfaceDatasetRepository as SingleHuggingfaceDatasetRepository, ) -from .evaluation.argilla_evaluation_repository import ( - ArgillaEvaluationRepository as ArgillaEvaluationRepository, -) -from .evaluation.argilla_evaluation_repository import ( - RecordDataSequence as RecordDataSequence, -) -from .evaluation.argilla_evaluator import ( - ArgillaEvaluationLogic as ArgillaEvaluationLogic, -) -from .evaluation.argilla_evaluator import ArgillaEvaluator as ArgillaEvaluator -from .evaluation.argilla_evaluator import ( - InstructComparisonArgillaEvaluationLogic as InstructComparisonArgillaEvaluationLogic, -) from .evaluation.domain import Evaluation as Evaluation from .evaluation.domain import EvaluationFailed as EvaluationFailed from .evaluation.domain import EvaluationOverview as EvaluationOverview @@ -66,11 +47,33 @@ from .evaluation.evaluation_repository import ( EvaluationRepository as EvaluationRepository, ) -from .evaluation.evaluator import EvaluationLogic as EvaluationLogic -from .evaluation.evaluator import Evaluator as Evaluator -from .evaluation.evaluator import ( +from .evaluation.evaluator.argilla_evaluator import ( + ArgillaEvaluationLogic as ArgillaEvaluationLogic, +) +from .evaluation.evaluator.argilla_evaluator import ArgillaEvaluator as ArgillaEvaluator +from .evaluation.evaluator.argilla_evaluator import ( + InstructComparisonArgillaEvaluationLogic as InstructComparisonArgillaEvaluationLogic, +) +from .evaluation.evaluator.argilla_evaluator import ( + RecordDataSequence as RecordDataSequence, +) +from .evaluation.evaluator.async_evaluator import ( + AsyncEvaluationRepository as AsyncEvaluationRepository, +) +from .evaluation.evaluator.evaluator import EvaluationLogic as EvaluationLogic +from .evaluation.evaluator.evaluator import Evaluator as Evaluator +from .evaluation.evaluator.evaluator import ( SingleOutputEvaluationLogic as SingleOutputEvaluationLogic, ) +from .evaluation.evaluator.incremental_evaluator import ( + IncrementalEvaluationLogic as IncrementalEvaluationLogic, +) +from .evaluation.evaluator.incremental_evaluator import ( + IncrementalEvaluator as IncrementalEvaluator, +) +from .evaluation.file_evaluation_repository import ( + AsyncFileEvaluationRepository as AsyncFileEvaluationRepository, +) from .evaluation.file_evaluation_repository import ( FileEvaluationRepository as FileEvaluationRepository, ) @@ -79,6 +82,9 @@ from .evaluation.graders import HighlightCoverageGrader as HighlightCoverageGrader from .evaluation.graders import LanguageMatchesGrader as LanguageMatchesGrader from .evaluation.graders import RougeGrader as RougeGrader +from .evaluation.in_memory_evaluation_repository import ( + AsyncInMemoryEvaluationRepository as AsyncInMemoryEvaluationRepository, +) from .evaluation.in_memory_evaluation_repository import ( InMemoryEvaluationRepository as InMemoryEvaluationRepository, ) diff --git a/src/intelligence_layer/evaluation/aggregation/aggregator.py b/src/intelligence_layer/evaluation/aggregation/aggregator.py index a03cf4210..61a0beffb 100644 --- a/src/intelligence_layer/evaluation/aggregation/aggregator.py +++ b/src/intelligence_layer/evaluation/aggregation/aggregator.py @@ -112,7 +112,7 @@ def __init__( def _get_types(self) -> Mapping[str, type]: """Type magic function that gets the actual types of the generic parameters. - Traverses the inheritance history of `BaseEvaluator`-subclass to find an actual type every time a TypeVar is replaced. + Traverses the inheritance history of `AggregationLogic`-subclass to find an actual type every time a TypeVar is replaced. Returns: Name of generic parameter to the type found. @@ -186,7 +186,7 @@ def aggregate_evaluation( ) -> AggregationOverview[AggregatedEvaluation]: """Aggregates all evaluations into an overview that includes high-level statistics. - Aggregates :class:`Evaluation`s according to the implementation of :func:`BaseEvaluator.aggregate`. + Aggregates :class:`Evaluation`s according to the implementation of :func:`AggregationLogic.aggregate`. Args: eval_ids: An overview of the evaluation to be aggregated. Does not include diff --git a/src/intelligence_layer/evaluation/aggregation/argilla_aggregator.py b/src/intelligence_layer/evaluation/aggregation/argilla_aggregator.py deleted file mode 100644 index 3785cec19..000000000 --- a/src/intelligence_layer/evaluation/aggregation/argilla_aggregator.py +++ /dev/null @@ -1,117 +0,0 @@ -import random -from abc import ABC -from collections import Counter -from typing import Iterable, Mapping - -from pydantic import BaseModel - -from intelligence_layer.connectors.argilla.argilla_client import ArgillaEvaluation -from intelligence_layer.evaluation.aggregation.accumulator import MeanAccumulator -from intelligence_layer.evaluation.aggregation.aggregation_repository import ( - AggregationRepository, -) -from intelligence_layer.evaluation.aggregation.aggregator import ( - AggregationLogic, - Aggregator, -) -from intelligence_layer.evaluation.aggregation.domain import AggregatedEvaluation -from intelligence_layer.evaluation.aggregation.elo import ( - EloCalculator, - MatchOutcome, - WinRateCalculator, -) -from intelligence_layer.evaluation.evaluation.argilla_evaluation_repository import ( - ArgillaEvaluationRepository, -) - - -class ArgillaAggregator( - Aggregator[ArgillaEvaluation, AggregatedEvaluation], - ABC, -): - """Aggregator used to aggregate Argilla (https://github.com/argilla-io/argilla) evaluations. - - You can fetch the results by using the `aggregate_evaluation` method. - - Arguments: - evaluation_repository: The repository that will be used to store evaluation results. - aggregation_repository: The repository that will be used to store aggregation results. - description: Human-readable description for the evaluator. - aggregation_logic: The logic to aggregate the evaluations. - - Generics: - ArgillaEvaluation: Interface of the metrics that come from the Argilla task`. - AggregatedEvaluation: The aggregated results of an evaluation run with a :class:`Dataset`. - """ - - def evaluation_type(self) -> type[ArgillaEvaluation]: # type: ignore - return ArgillaEvaluation - - def __init__( - self, - evaluation_repository: ArgillaEvaluationRepository, - aggregation_repository: AggregationRepository, - description: str, - aggregation_logic: AggregationLogic[ArgillaEvaluation, AggregatedEvaluation], - ) -> None: - super().__init__( - evaluation_repository, - aggregation_repository, - description, - aggregation_logic, - ) - - -class PlayerScore(BaseModel): - elo: float - elo_standard_error: float - win_rate: float - num_matches: int - - -class AggregatedInstructComparison(BaseModel): - scores: Mapping[str, PlayerScore] - - -class InstructComparisonArgillaAggregationLogic( - AggregationLogic[ArgillaEvaluation, AggregatedInstructComparison] -): - def aggregate( - self, evaluations: Iterable[ArgillaEvaluation] - ) -> AggregatedInstructComparison: - flattened_evaluations = [ - ( - evaluation.metadata["first"], - evaluation.metadata["second"], - MatchOutcome.from_rank_literal(int(evaluation.responses["winner"])), - ) - for evaluation in evaluations - ] - player_counter = Counter( - player for match in flattened_evaluations for player in [match[0], match[1]] - ) - player_counts = dict(player_counter) - players = player_counts.keys() - - accumulators = {p: MeanAccumulator() for p in players} - for _ in range(100): - elo_calc = EloCalculator(players) - random.shuffle(flattened_evaluations) - elo_calc.calculate(flattened_evaluations) - for p in players: - accumulators[p].add(elo_calc.ratings[p]) - - win_rate_calc = WinRateCalculator(players) - win_rate = win_rate_calc.calculate(flattened_evaluations) - - return AggregatedInstructComparison( - scores={ - p: PlayerScore( - elo=acc.extract(), - elo_standard_error=acc.standard_error(), - win_rate=win_rate[p], - num_matches=player_counts[p], - ) - for p, acc in accumulators.items() - }, - ) diff --git a/src/intelligence_layer/evaluation/aggregation/elo.py b/src/intelligence_layer/evaluation/aggregation/elo.py index 0fbdac2a1..76324630d 100644 --- a/src/intelligence_layer/evaluation/aggregation/elo.py +++ b/src/intelligence_layer/evaluation/aggregation/elo.py @@ -1,8 +1,13 @@ -from collections import defaultdict +import random +from collections import Counter, defaultdict from enum import Enum from typing import Iterable, Mapping, Sequence import numpy as np +from pydantic import BaseModel + +from intelligence_layer.evaluation.aggregation.accumulator import MeanAccumulator +from intelligence_layer.evaluation.aggregation.aggregator import AggregationLogic class MatchOutcome(str, Enum): @@ -96,3 +101,64 @@ def calculate( player: self.win_count[player] / match_count for player, match_count in self.match_count.items() } + + +class PlayerScore(BaseModel): + elo: float + elo_standard_error: float + win_rate: float + num_matches: int + + +class AggregatedComparison(BaseModel): + scores: Mapping[str, PlayerScore] + + +class ComparisonEvaluation(BaseModel): + first: str + second: str + winner: MatchOutcome + + +class ComparisonAggregationLogic( + AggregationLogic[ComparisonEvaluation, AggregatedComparison] +): + def aggregate( + self, evaluations: Iterable[ComparisonEvaluation] + ) -> AggregatedComparison: + flattened_evaluations = [ + ( + evaluation.first, + evaluation.second, + evaluation.winner, + ) + for evaluation in evaluations + ] + player_counter = Counter( + player for match in flattened_evaluations for player in [match[0], match[1]] + ) + player_counts = dict(player_counter) + players = player_counts.keys() + + accumulators = {p: MeanAccumulator() for p in players} + for _ in range(100): + elo_calc = EloCalculator(players) + random.shuffle(flattened_evaluations) + elo_calc.calculate(flattened_evaluations) + for p in players: + accumulators[p].add(elo_calc.ratings[p]) + + win_rate_calc = WinRateCalculator(players) + win_rate = win_rate_calc.calculate(flattened_evaluations) + + return AggregatedComparison( + scores={ + p: PlayerScore( + elo=acc.extract(), + elo_standard_error=acc.standard_error(), + win_rate=win_rate[p], + num_matches=player_counts[p], + ) + for p, acc in accumulators.items() + }, + ) diff --git a/src/intelligence_layer/evaluation/evaluation/argilla_evaluation_repository.py b/src/intelligence_layer/evaluation/evaluation/argilla_evaluation_repository.py deleted file mode 100644 index ec643578a..000000000 --- a/src/intelligence_layer/evaluation/evaluation/argilla_evaluation_repository.py +++ /dev/null @@ -1,166 +0,0 @@ -from itertools import chain -from typing import Optional, Sequence, cast -from uuid import uuid4 - -from pydantic import BaseModel - -from intelligence_layer.connectors.argilla.argilla_client import ( - ArgillaClient, - ArgillaEvaluation, - Field, - Question, - RecordData, -) -from intelligence_layer.evaluation.evaluation.domain import ( - Evaluation, - EvaluationOverview, - ExampleEvaluation, - FailedExampleEvaluation, -) -from intelligence_layer.evaluation.evaluation.evaluation_repository import ( - EvaluationRepository, -) - - -class RecordDataSequence(BaseModel): - records: Sequence[RecordData] - - -class ArgillaEvaluationRepository(EvaluationRepository): - """Evaluation repository used in the :class:`ArgillaEvaluator` and :class:`ArgillaAggregator`. - - Only an `EvaluationOverview` is stored in the `evaluation_repository`, while the `ExampleEvaluation`s themselves are stored in Argilla. - These `ExampleEvaluation`s are submitted to Argilla in the `store_example_evaluation` method. - - Args: - evaluation_repository: The evaluation repository to use internally. - argilla_client: Client to be used to connect to Argilla. - workspace_id: The workspace ID to save the results in. - Has to be created in Argilla beforehand. - fields: The Argilla fields of the dataset. Has to be set for use in the :class:`ArgillaEvaluator`. - questions: The questions that will be presented to the human evaluators in Argilla. Has to be set for use in the :class:`ArgillaEvaluator`. - """ - - def __init__( - self, - evaluation_repository: EvaluationRepository, - argilla_client: ArgillaClient, - workspace_id: str, - fields: Optional[Sequence[Field]] = None, - questions: Optional[Sequence[Question]] = None, - ) -> None: - super().__init__() - self._evaluation_repository = evaluation_repository - self._client = argilla_client - self._workspace_id = workspace_id - self._fields = fields - self._questions = questions - - def initialize_evaluation(self) -> str: - if self._fields is None or self._questions is None: - raise ValueError( - "Fields and questions have to be set to initialize the evaluation but are `None`." - ) - - return self._client.ensure_dataset_exists( - self._workspace_id, - str(uuid4()), - self._fields, - self._questions, - ) - - def store_evaluation_overview(self, overview: EvaluationOverview) -> None: - return self._evaluation_repository.store_evaluation_overview(overview) - - def evaluation_overview(self, evaluation_id: str) -> Optional[EvaluationOverview]: - return self._evaluation_repository.evaluation_overview(evaluation_id) - - def evaluation_overview_ids(self) -> Sequence[str]: - return sorted(self._evaluation_repository.evaluation_overview_ids()) - - def store_example_evaluation( - self, evaluation: ExampleEvaluation[Evaluation] - ) -> None: - if isinstance(evaluation.result, RecordDataSequence): - for record in evaluation.result.records: - self._client.add_record(evaluation.evaluation_id, record) - elif isinstance(evaluation.result, FailedExampleEvaluation): - self._evaluation_repository.store_example_evaluation(evaluation) - else: - raise TypeError( - "ArgillaEvaluationRepository does not support storing non-RecordDataSequence evaluations." - ) - - def example_evaluation( - self, evaluation_id: str, example_id: str, evaluation_type: type[Evaluation] - ) -> Optional[ExampleEvaluation[Evaluation]]: - return self._evaluation_repository.example_evaluation( - evaluation_id, example_id, evaluation_type - ) - - def example_evaluations( - self, evaluation_id: str, evaluation_type: type[Evaluation] - ) -> Sequence[ExampleEvaluation[Evaluation]]: - assert evaluation_type == ArgillaEvaluation - successful_example_evaluations = self.successful_example_evaluations( - evaluation_id, evaluation_type - ) - failed_example_evaluations = self.failed_example_evaluations( - evaluation_id, evaluation_type - ) - - return sorted( - chain(successful_example_evaluations, failed_example_evaluations), - key=lambda evaluation: evaluation.example_id, - ) - - def successful_example_evaluations( - self, evaluation_id: str, evaluation_type: type[Evaluation] - ) -> Sequence[ExampleEvaluation[Evaluation]]: - """Returns all successfully stored :class:`ExampleEvaluation`s for the given evaluation overview ID sorted by their example ID. - - Args: - evaluation_id: ID of the corresponding evaluation overview. - evaluation_type: Type of evaluations that the :class:`Evaluator` returned - in :func:`Evaluator.do_evaluate`. - - Returns: - A :class:`Sequence` of successful :class:`ExampleEvaluation`s. - """ - assert evaluation_type == ArgillaEvaluation - example_evaluations = [ - ExampleEvaluation( - evaluation_id=evaluation_id, - example_id=e.example_id, - # cast to Evaluation because mypy thinks ArgillaEvaluation cannot be Evaluation - result=cast(Evaluation, e), - ) - for e in self._client.evaluations(evaluation_id) - ] - return sorted(example_evaluations, key=lambda i: i.example_id) - - def failed_example_evaluations( - self, evaluation_id: str, evaluation_type: type[Evaluation] - ) -> Sequence[ExampleEvaluation[Evaluation]]: - """Returns all failed :class:`ExampleEvaluation`s sorted by their example ID. - - A failed example evaluation is an :class:`ExampleEvaluation` for which the storage process failed, e.g., because - the Argilla service was unresponsive. - - The failed examples are stored in the given evaluation repository and not in Argilla. - - Args: - evaluation_id: ID of the corresponding evaluation overview. - evaluation_type: Type of evaluations that the :class:`Evaluator` returned - in :func:`Evaluator.do_evaluate` - - Returns: - A `Sequence` of failed example evaluations. - """ - # If there are no failed examples, the dataset with the id was never created and fails on retrieval. - try: - return self._evaluation_repository.failed_example_evaluations( - evaluation_id, evaluation_type - ) - except ValueError: - return [] diff --git a/src/intelligence_layer/evaluation/evaluation/argilla_evaluator.py b/src/intelligence_layer/evaluation/evaluation/argilla_evaluator.py deleted file mode 100644 index 137847a2d..000000000 --- a/src/intelligence_layer/evaluation/evaluation/argilla_evaluator.py +++ /dev/null @@ -1,190 +0,0 @@ -import random -from abc import ABC, abstractmethod -from itertools import combinations -from typing import Mapping, Optional - -from intelligence_layer.connectors.argilla.argilla_client import ( - ArgillaClient, - ArgillaEvaluation, - Field, - Question, - RecordData, -) -from intelligence_layer.core import CompleteOutput, Input, InstructInput, Output -from intelligence_layer.evaluation.dataset.dataset_repository import DatasetRepository -from intelligence_layer.evaluation.dataset.domain import Example, ExpectedOutput -from intelligence_layer.evaluation.evaluation.argilla_evaluation_repository import ( - ArgillaEvaluationRepository, - RecordDataSequence, -) -from intelligence_layer.evaluation.evaluation.evaluation_repository import ( - EvaluationRepository, -) -from intelligence_layer.evaluation.evaluation.evaluator import ( - EvaluationLogic, - Evaluator, -) -from intelligence_layer.evaluation.run.domain import SuccessfulExampleOutput -from intelligence_layer.evaluation.run.run_repository import RunRepository - - -class ArgillaEvaluationLogic( - EvaluationLogic[Input, Output, ExpectedOutput, RecordDataSequence], ABC -): - def do_evaluate( - self, - example: Example[Input, ExpectedOutput], - *output: SuccessfulExampleOutput[Output], - ) -> RecordDataSequence: - return self._to_record(example, *output) - - @abstractmethod - def _to_record( - self, - example: Example[Input, ExpectedOutput], - *output: SuccessfulExampleOutput[Output], - ) -> RecordDataSequence: - """This method is responsible for translating the `Example` and `Output` of the task to :class:`RecordData` - - - Args: - example: The example to be translated. - output: The output of the example that was run. - """ - ... - - -class ArgillaEvaluator( - Evaluator[Input, Output, ExpectedOutput, ArgillaEvaluation], - ABC, -): - """Evaluator used to integrate with Argilla (https://github.com/argilla-io/argilla). - - Use this evaluator if you would like to easily do human eval. - This evaluator runs a dataset and sends the input and output to Argilla to be evaluated. - - Arguments: - dataset_repository: The repository with the examples that will be taken for the evaluation. - run_repository: The repository of the runs to evaluate. - evaluation_repository: The repository that will be used to store evaluation results. - description: Human-readable description for the evaluator. - evaluation_logic: The logic to use for evaluation. - - Generics: - Input: Interface to be passed to the :class:`Task` that shall be evaluated. - Output: Type of the output of the :class:`Task` to be evaluated. - ExpectedOutput: Output that is expected from the run with the supplied input. - ArgillaEvaluation: Interface of the metrics that come from the Argilla task`. - """ - - def __init__( - self, - dataset_repository: DatasetRepository, - run_repository: RunRepository, - evaluation_repository: ArgillaEvaluationRepository, - description: str, - evaluation_logic: ArgillaEvaluationLogic[Input, Output, ExpectedOutput], - ) -> None: - super().__init__( - dataset_repository, - run_repository, - evaluation_repository, - description, - evaluation_logic, # type: ignore - ) - - def evaluation_type(self) -> type[ArgillaEvaluation]: # type: ignore - return ArgillaEvaluation - - -class InstructComparisonArgillaEvaluationLogic( - ArgillaEvaluationLogic[InstructInput, CompleteOutput, None] -): - def __init__( - self, - workspace_id: str, - fields: Mapping[str, Field], - high_priority_runs: Optional[frozenset[str]] = None, - ) -> None: - self._workspace_id = workspace_id - self._fields = fields - self._high_priority_runs = high_priority_runs - - def _to_record( - self, - example: Example[InstructInput, None], - *outputs: SuccessfulExampleOutput[CompleteOutput], - ) -> RecordDataSequence: - pairs = combinations(outputs, 2) - return RecordDataSequence( - records=[ - self._create_record_data(example, first, second) - for [first, second] in pairs - if self._high_priority_runs is None - or any( - run_id in self._high_priority_runs - for run_id in [first.run_id, second.run_id] - ) - ] - ) - - def _create_record_data( - self, - example: Example[InstructInput, None], - first: SuccessfulExampleOutput[CompleteOutput], - second: SuccessfulExampleOutput[CompleteOutput], - ) -> RecordData: - if random.choice([True, False]): - first, second = second, first - return RecordData( - content={ - self._fields["KEY_INSTRUCTION"].name: example.input.instruction, - self._fields["KEY_INPUT"].name: example.input.input or "", - self._fields["KEY_RESPONSE_1"].name: first.output.completion, - self._fields["KEY_RESPONSE_2"].name: second.output.completion, - }, - example_id=example.id, - metadata={ - self._fields["KEY_RESPONSE_1"].name: first.run_id, - self._fields["KEY_RESPONSE_2"].name: second.run_id, - }, - ) - - -def create_instruct_comparison_argilla_evaluation_classes( - workspace_id: str, - evaluation_repository: EvaluationRepository, - argilla_client: ArgillaClient, - high_priority_runs: Optional[frozenset[str]] = None, -) -> tuple[InstructComparisonArgillaEvaluationLogic, ArgillaEvaluationRepository]: - KEY_INSTRUCTION = "instruction" - KEY_INPUT = "input" - KEY_RESPONSE_1 = "first" - KEY_RESPONSE_2 = "second" - KEY_QUESTION = "winner" - OPTIONS = [1, 2, 3] - - fields = { - "KEY_INSTRUCTION": Field(name=KEY_INSTRUCTION, title="Instruction"), - "KEY_INPUT": Field(name=KEY_INPUT, title="Input"), - "KEY_RESPONSE_1": Field(name=KEY_RESPONSE_1, title="Response 1"), - "KEY_RESPONSE_2": Field(name=KEY_RESPONSE_2, title="Response 2"), - } - questions = [ - Question( - name=KEY_QUESTION, - title="Which response is better?", - description="1: The first completion is better.\n2: The second completion is better.\n3: They are both equally good.", - options=OPTIONS, - ) - ] - - return InstructComparisonArgillaEvaluationLogic( - workspace_id, fields, high_priority_runs - ), ArgillaEvaluationRepository( - evaluation_repository, - argilla_client, - workspace_id, - list(fields.values()), - questions, - ) diff --git a/src/intelligence_layer/evaluation/evaluation/domain.py b/src/intelligence_layer/evaluation/evaluation/domain.py index e4cd7d254..31abda1a4 100644 --- a/src/intelligence_layer/evaluation/evaluation/domain.py +++ b/src/intelligence_layer/evaluation/evaluation/domain.py @@ -1,6 +1,6 @@ import traceback from datetime import datetime -from typing import Generic, Optional, TypeVar +from typing import Generic, TypeVar from pydantic import BaseModel, SerializeAsAny from rich.tree import Tree @@ -65,19 +65,65 @@ def _rich_render(self, skip_example_id: bool = False) -> Tree: return tree +class PartialEvaluationOverview(BaseModel, frozen=True): + """Overview of the un-aggregated results of evaluating a :class:`Task` on a dataset. + + Attributes: + run_overviews: Overviews of the runs that were evaluated. + id: The unique identifier of this evaluation. + start: The time when the evaluation run was started. + submitted_evaluation_count: The amount of evaluations that were submitted successfully. + description: human-readable for the evaluator that created the evaluation. + """ + + run_overviews: frozenset[RunOverview] + id: str + start_date: datetime + submitted_evaluation_count: int + description: str + + def __repr__(self) -> str: + return self.__str__() + + def __str__(self) -> str: + run_overview_str: str = "Run Overviews={\n" + comma_counter = 0 + for overview in self.run_overviews: + run_overview_str += f"{overview}" + if comma_counter < len(self.run_overviews) - 1: + run_overview_str += ", " + comma_counter += 1 + run_overview_str += "}\n" + + return ( + f"Evaluation Overview ID = {self.id}\n" + f"Start time = {self.start_date}\n" + f"Submitted Evaluations = {self.submitted_evaluation_count}\n" + f'Description = "{self.description}"\n' + f"{run_overview_str}" + ) + + class EvaluationOverview(BaseModel, frozen=True): """Overview of the un-aggregated results of evaluating a :class:`Task` on a dataset. Attributes: run_overviews: Overviews of the runs that were evaluated. id: The unique identifier of this evaluation. - start: The time when the evaluation run was started - description: human-readable for the evaluator that created the evaluation + start_date: The time when the evaluation run was started. + end_date: The time when the evaluation run was finished. + successful_evaluation_count: Number of successfully evaluated examples. + failed_evaluation_count: Number of examples that produced an error during evaluation. + Note: failed runs are skipped in the evaluation and therefore not counted as failures + description: human-readable for the evaluator that created the evaluation. """ run_overviews: frozenset[RunOverview] id: str - start: Optional[datetime] + start_date: datetime + end_date: datetime + successful_evaluation_count: int + failed_evaluation_count: int description: str def __repr__(self) -> str: @@ -95,7 +141,10 @@ def __str__(self) -> str: return ( f"Evaluation Overview ID = {self.id}\n" - f"Start time = {self.start}\n" + f"Start time = {self.start_date}\n" + f"End time = {self.end_date}\n" + f"Successful examples = {self.successful_evaluation_count}\n" + f"Failed examples = {self.failed_evaluation_count}\n" f'Description = "{self.description}"\n' f"{run_overview_str}" ) diff --git a/src/intelligence_layer/evaluation/evaluation/evaluator.py b/src/intelligence_layer/evaluation/evaluation/evaluator.py deleted file mode 100644 index c5b01a436..000000000 --- a/src/intelligence_layer/evaluation/evaluation/evaluator.py +++ /dev/null @@ -1,606 +0,0 @@ -import typing -from abc import ABC, abstractmethod -from concurrent.futures import ThreadPoolExecutor -from functools import lru_cache -from typing import ( - Generic, - Iterable, - Mapping, - Optional, - Sequence, - Tuple, - TypeVar, - cast, - final, - get_args, - get_origin, -) - -from tqdm import tqdm - -from intelligence_layer.core import Input, Output, utc_now -from intelligence_layer.evaluation.dataset.dataset_repository import DatasetRepository -from intelligence_layer.evaluation.dataset.domain import Example, ExpectedOutput -from intelligence_layer.evaluation.evaluation.domain import ( - Evaluation, - EvaluationOverview, - ExampleEvaluation, - FailedExampleEvaluation, -) -from intelligence_layer.evaluation.evaluation.evaluation_repository import ( - EvaluationRepository, -) -from intelligence_layer.evaluation.infrastructure.repository_navigator import ( - EvaluationLineage, - RepositoryNavigator, -) -from intelligence_layer.evaluation.run.domain import ( - ExampleOutput, - FailedExampleRun, - RunOverview, - SuccessfulExampleOutput, -) -from intelligence_layer.evaluation.run.run_repository import RunRepository - - -class EvaluationLogic(ABC, Generic[Input, Output, ExpectedOutput, Evaluation]): - @abstractmethod - def do_evaluate( - self, - example: Example[Input, ExpectedOutput], - *output: SuccessfulExampleOutput[Output], - ) -> Evaluation: - """Executes the evaluation for this specific example. - - Responsible for comparing the input & expected output of a task to the - actually generated output. - - Args: - example: Input data of :class:`Task` to produce the output. - output: Output of the :class:`Task`. - - Returns: - The metrics that come from the evaluated :class:`Task`. - """ - pass - - -class IncrementalEvaluationLogic( - EvaluationLogic[Input, Output, ExpectedOutput, Evaluation] -): - def __init__(self) -> None: - super().__init__() - self._previous_run_output_ids: list[set[str]] = [] - - def set_previous_run_output_ids( - self, previous_run_output_ids: list[set[str]] - ) -> None: - self._previous_run_output_ids = previous_run_output_ids - - def do_evaluate( - self, - example: Example[Input, ExpectedOutput], - *outputs: SuccessfulExampleOutput[Output], - ) -> Evaluation: - """Executes the evaluation for this specific example. - - Responsible for comparing the input & expected output of a task to the - actually generated output. The difference to the standard :class:`EvaluationLogic`'s `do_evaluate` is that - this method will separate already processed evaluation from new ones before handing them over to - `do_incremental_evaluate`. - - Args: - example: Input data of :class:`Task` to produce the output. - outputs: Outputs of the :class:`Task`. - - Returns: - :class:`Evaluation`: The metrics that come from the evaluated :class:`Task`. - """ - flattened_run_output_ids: set[str] = set() - evaluated_outputs = [] - for run_output_ids in self._previous_run_output_ids: - flattened_run_output_ids = flattened_run_output_ids.union(run_output_ids) - evaluated_outputs.append( - [output for output in outputs if output.run_id in run_output_ids] - ) - - new_outputs = [ - output - for output in outputs - if output.run_id not in flattened_run_output_ids - ] - return self.do_incremental_evaluate(example, new_outputs, evaluated_outputs) - - @abstractmethod - def do_incremental_evaluate( - self, - example: Example[Input, ExpectedOutput], - outputs: list[SuccessfulExampleOutput[Output]], - already_evaluated_outputs: list[list[SuccessfulExampleOutput[Output]]], - ) -> Evaluation: - pass - - -class SingleOutputEvaluationLogic( - EvaluationLogic[Input, Output, ExpectedOutput, Evaluation] -): - @final - def do_evaluate( - self, - example: Example[Input, ExpectedOutput], - *output: SuccessfulExampleOutput[Output], - ) -> Evaluation: - assert len(output) == 1 - return self.do_evaluate_single_output(example, output[0].output) - - @abstractmethod - def do_evaluate_single_output( - self, example: Example[Input, ExpectedOutput], output: Output - ) -> Evaluation: - pass - - -class Evaluator(Generic[Input, Output, ExpectedOutput, Evaluation]): - """Evaluator that can handle automatic evaluation scenarios. - - This evaluator should be used for automatic eval. A user still has to implement - :class:`EvaluationLogic`. - - - Arguments: - dataset_repository: The repository with the examples that will be taken for the evaluation. - run_repository: The repository of the runs to evaluate. - evaluation_repository: The repository that will be used to store evaluation results. - description: Human-readable description for the evaluator. - evaluation_logic: The logic to use for evaluation. - - Generics: - Input: Interface to be passed to the :class:`Task` that shall be evaluated. - Output: Type of the output of the :class:`Task` to be evaluated. - ExpectedOutput: Output that is expected from the run with the supplied input. - Evaluation: Interface of the metrics that come from the evaluated :class:`Task`. - """ - - def __init__( - self, - dataset_repository: DatasetRepository, - run_repository: RunRepository, - evaluation_repository: EvaluationRepository, - description: str, - evaluation_logic: EvaluationLogic[Input, Output, ExpectedOutput, Evaluation], - ) -> None: - self._dataset_repository = dataset_repository - self._run_repository = run_repository - self._evaluation_repository = evaluation_repository - self._evaluation_logic = evaluation_logic - self.description = description - - @lru_cache(maxsize=1) - def _get_types(self) -> Mapping[str, type]: - """Type magic function that gets the actual types of the generic parameters. - - Traverses the inheritance history of `BaseEvaluator`-subclass to find an actual type every time a TypeVar is replaced. - - Returns: - Name of generic parameter to the type found. - """ - - def is_eligible_subclass(parent: type) -> bool: - return hasattr(parent, "__orig_bases__") and issubclass( - parent, EvaluationLogic - ) - - def update_types() -> None: - num_types_set = 0 - for current_index, current_type in enumerate(current_types): - if type(current_type) is not TypeVar: - type_var_count = num_types_set - 1 - for element_index, element in enumerate(type_list): - if type(element) is TypeVar: - type_var_count += 1 - if type_var_count == current_index: - break - assert type_var_count == current_index - type_list[element_index] = current_type - num_types_set += 1 - - # mypy does not know __orig_bases__ - base_types = EvaluationLogic.__orig_bases__[1] # type: ignore - type_list: list[type | TypeVar] = list(get_args(base_types)) - possible_parent_classes = [ - p - for p in reversed(type(self._evaluation_logic).__mro__) - if is_eligible_subclass(p) - ] - for parent in possible_parent_classes: - # mypy does not know __orig_bases__ - for base in parent.__orig_bases__: # type: ignore - origin = get_origin(base) - if origin is None or not issubclass(origin, EvaluationLogic): - continue - current_types = list(get_args(base)) - update_types() - - return { - name: param_type - for name, param_type in zip( - (a.__name__ for a in get_args(base_types)), type_list - ) - if type(param_type) is not TypeVar - } - - def input_type(self) -> type[Input]: - try: - input_type = self._get_types()["Input"] - except KeyError: - raise TypeError(f"Alternatively overwrite input_type() in {type(self)}") - return cast(type[Input], input_type) - - def output_type(self) -> type[Output]: - """Returns the type of the evaluated task's output. - - This can be used to retrieve properly typed outputs of an evaluation run - from a :class:`EvaluationRepository` - - Returns: - the type of the evaluated task's output. - """ - try: - output_type = self._get_types()["Output"] - except KeyError: - raise TypeError(f"Alternatively overwrite output_type() in {type(self)}") - return cast(type[Output], output_type) - - def expected_output_type(self) -> type[ExpectedOutput]: - try: - expected_output_type = self._get_types()["ExpectedOutput"] - except KeyError: - raise TypeError( - f"Alternatively overwrite expected_output_type() in {type(self)}" - ) - return cast(type[ExpectedOutput], expected_output_type) - - def evaluation_type(self) -> type[Evaluation]: - """Returns the type of the evaluation result of an example. - - This can be used to retrieve properly typed evaluations of an evaluation run - from a :class:`EvaluationRepository` - - Returns: - Returns the type of the evaluation result of an example. - """ - try: - evaluation_type = self._get_types()["Evaluation"] - except KeyError: - raise TypeError( - f"Alternatively overwrite evaluation_type() in {type(self)}" - ) - return cast(type[Evaluation], evaluation_type) - - def evaluate_runs( - self, - *run_ids: str, - num_examples: Optional[int] = None, - abort_on_error: bool = False, - ) -> EvaluationOverview: - """Evaluates all generated outputs in the run. - - For each set of successful outputs in the referenced runs, - :func:`EvaluationLogic.do_evaluate` is called and eval metrics are produced & - stored in the provided :class:`EvaluationRepository`. - - Args: - run_ids: The runs to be evaluated. Each run is expected to have the same - dataset as input (which implies their tasks have the same input-type) - and their tasks have the same output-type. For each example in the - dataset referenced by the runs the outputs of all runs are collected - and if all of them were successful they are passed on to the implementation - specific evaluation. The method compares all run of the provided ids to each other. - num_examples: The number of examples which should be evaluated from the given runs. - Always the first n runs stored in the evaluation repository - abort_on_error: Flag to abort all evaluations when an error occurs. Defaults to False. - - Returns: - EvaluationOverview: An overview of the evaluation. Individual :class:`Evaluation`s will not be - returned but instead stored in the :class:`EvaluationRepository` provided in the - __init__. - """ - - def load_run_overview(run_id: str) -> RunOverview: - run_overview = self._run_repository.run_overview(run_id) - if not run_overview: - raise ValueError(f"No RunOverview found for run-id: {run_id}") - return run_overview - - if not run_ids: - raise ValueError("At least one run-id needs to be provided") - run_overviews = frozenset(load_run_overview(run_id) for run_id in run_ids) - if not all( - next(iter(run_overviews)).dataset_id == run_overview.dataset_id - for run_overview in run_overviews - ): - raise ValueError( - f"All run-overviews must reference the same dataset: {run_overviews}" - ) - eval_id = self._evaluation_repository.initialize_evaluation() - dataset_id = next(iter(run_overviews)).dataset_id - examples = self._dataset_repository.examples( - dataset_id, - self.input_type(), - self.expected_output_type(), - ) - if examples is None: - raise ValueError(f"Dataset: {dataset_id} not found") - start = utc_now() - - examples_zipped: Iterable[tuple[ExampleOutput[Output], ...]] = zip( - *( - self._run_repository.example_outputs( - run_overview.id, self.output_type() - ) - for run_overview in run_overviews - ), - strict=True, - ) - - def generate_evaluation_inputs() -> ( - Iterable[ - Tuple[ - Example[Input, ExpectedOutput], - str, - Sequence[SuccessfulExampleOutput[Output]], - ] - ] - ): - current_example = 0 - for example_outputs in examples_zipped: - successful_example_outputs = [ - typing.cast(SuccessfulExampleOutput[Output], output) - for output in example_outputs - if not isinstance(output.output, FailedExampleRun) - ] - if not successful_example_outputs: - continue - example_id = successful_example_outputs[0].example_id - assert all( - example_output.example_id == example_id - for example_output in successful_example_outputs - ) - - example = self._dataset_repository.example( - dataset_id, - example_id, - self.input_type(), - self.expected_output_type(), - ) - assert example is not None - - if num_examples and current_example >= num_examples: - break - current_example += 1 - - yield ( - example, - eval_id, - successful_example_outputs, - ) - - with ThreadPoolExecutor(max_workers=10) as executor: - list( # the list is needed to consume the iterator returned from the executor.map - tqdm( - executor.map( - lambda args: self.evaluate( - args[0], args[1], abort_on_error, *args[2] - ), - generate_evaluation_inputs(), - ), - desc="Evaluating", - ) - ) - - partial_overview = EvaluationOverview( - run_overviews=run_overviews, - id=eval_id, - start=start, - description=self.description, - ) - self._evaluation_repository.store_evaluation_overview(partial_overview) - - return partial_overview - - @final - def evaluate( - self, - example: Example[Input, ExpectedOutput], - evaluation_id: str, - abort_on_error: bool, - *example_outputs: SuccessfulExampleOutput[Output], - ) -> None: - try: - result: Evaluation | FailedExampleEvaluation = ( - self._evaluation_logic.do_evaluate( - example, - *example_outputs, - ) - ) - except Exception as e: - if abort_on_error: - raise e - print( - f'FAILED EVALUATION: example "{example.id}", {type(e).__qualname__}: "{e}"' - ) - result = FailedExampleEvaluation.from_exception(e) - self._evaluation_repository.store_example_evaluation( - ExampleEvaluation( - evaluation_id=evaluation_id, example_id=example.id, result=result - ) - ) - - def failed_evaluations( - self, evaluation_id: str - ) -> Iterable[EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]]: - """Returns the `EvaluationLineage` objects for all failed example evalations that belong to the given evaluation ID. - - Args: - evaluation_id: The ID of the evaluation overview - - Returns: - :class:`Iterable` of :class:`EvaluationLineage`s. - """ - failed_example_evaluations = ( - self._evaluation_repository.failed_example_evaluations( - evaluation_id, evaluation_type=self.evaluation_type() - ) - ) - lineages = ( - self.evaluation_lineage(evaluation_id, output.example_id) - for output in failed_example_evaluations - ) - return (lineage for lineage in lineages if lineage is not None) - - def evaluation_lineages( - self, evaluation_id: str - ) -> Iterable[EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]]: - """Wrapper for `RepositoryNagivator.evaluation_lineages`. - - Args: - evaluation_id: The id of the evaluation - - Returns: - An iterator over all :class:`EvaluationLineage`s for the given evaluation id. - """ - navigator = RepositoryNavigator( - self._dataset_repository, self._run_repository, self._evaluation_repository - ) - return navigator.evaluation_lineages( - evaluation_id=evaluation_id, - input_type=self.input_type(), - expected_output_type=self.expected_output_type(), - output_type=self.output_type(), - evaluation_type=self.evaluation_type(), - ) - - def evaluation_lineage( - self, evaluation_id: str, example_id: str - ) -> EvaluationLineage[Input, ExpectedOutput, Output, Evaluation] | None: - """Wrapper for `RepositoryNagivator.evaluation_lineage`. - - Args: - evaluation_id: The id of the evaluation - example_id: The id of the example of interest - - Returns: - The :class:`EvaluationLineage` for the given evaluation id and example id. - Returns `None` if the lineage is not complete because either an example, a run, or an evaluation does not exist. - """ - navigator = RepositoryNavigator( - self._dataset_repository, self._run_repository, self._evaluation_repository - ) - return navigator.evaluation_lineage( - evaluation_id=evaluation_id, - example_id=example_id, - input_type=self.input_type(), - expected_output_type=self.expected_output_type(), - output_type=self.output_type(), - evaluation_type=self.evaluation_type(), - ) - - -class IncrementalEvaluator(Evaluator[Input, Output, ExpectedOutput, Evaluation]): - """:class:`Evaluator` for evaluating additional runs on top of previous evaluations. Intended for use with :class:`IncrementalEvaluationLogic`. - - Args: - dataset_repository: The repository with the examples that will be taken for the evaluation. - run_repository: The repository of the runs to evaluate. - evaluation_repository: The repository that will be used to store evaluation results. - description: Human-readable description for the evaluator. - incremental_evaluation_logic: The logic to use for evaluation. - - Generics: - Input: Interface to be passed to the :class:`Task` that shall be evaluated. - Output: Type of the output of the :class:`Task` to be evaluated. - ExpectedOutput: Output that is expected from the run with the supplied input. - Evaluation: Interface of the metrics that come from the evaluated :class:`Task`. - """ - - def __init__( - self, - dataset_repository: DatasetRepository, - run_repository: RunRepository, - evaluation_repository: EvaluationRepository, - description: str, - incremental_evaluation_logic: IncrementalEvaluationLogic[ - Input, Output, ExpectedOutput, Evaluation - ], - ) -> None: - super().__init__( - dataset_repository=dataset_repository, - run_repository=run_repository, - evaluation_repository=evaluation_repository, - description=description, - evaluation_logic=incremental_evaluation_logic, - ) - - def evaluate_additional_runs( - self, - *run_ids: str, - previous_evaluation_ids: Optional[list[str]] = None, - num_examples: Optional[int] = None, - abort_on_error: bool = False, - ) -> EvaluationOverview: - """Evaluate all runs while considering which runs have already been evaluated according to `previous_evaluation_id`. - - For each set of successful outputs in the referenced runs, - :func:`EvaluationLogic.do_evaluate` is called and eval metrics are produced & - stored in the provided :class:`EvaluationRepository`. - - Args: - run_ids: The runs to be evaluated. Each run is expected to have the same - dataset as input (which implies their tasks have the same input-type) - and their tasks have the same output-type. For each example in the - dataset referenced by the runs the outputs of all runs are collected - and if all of them were successful they are passed on to the implementation - specific evaluation. The method compares all run of the provided ids to each other. - previous_evaluation_ids: IDs of previous evaluation to consider - num_examples: The number of examples which should be evaluated from the given runs. - Always the first n runs stored in the evaluation repository - abort_on_error: Flag to abort all evaluations when an error occurs. Defaults to False. - - Returns: - EvaluationOverview: An overview of the evaluation. Individual :class:`Evaluation`s will not be - returned but instead stored in the :class:`EvaluationRepository` provided in the - __init__. - """ - - previous_run_ids = [] - previous_evaluation_ids = previous_evaluation_ids or [] - - for previous_evaluation_id in previous_evaluation_ids: - prev_run_ids: set[str] = set() - lineages = self.evaluation_lineages(previous_evaluation_id) - for lineage in lineages: - for output in lineage.outputs: - prev_run_ids.add(output.run_id) - previous_run_ids.append(prev_run_ids) - - cast( - IncrementalEvaluationLogic[Input, Output, ExpectedOutput, Evaluation], - self._evaluation_logic, - ).set_previous_run_output_ids(previous_run_ids) - return super().evaluate_runs( - *run_ids, num_examples=num_examples, abort_on_error=abort_on_error - ) - - def evaluate_runs( - self, - *run_ids: str, - num_examples: Optional[int] = None, - abort_on_error: bool = False, - ) -> EvaluationOverview: - cast( - IncrementalEvaluationLogic[Input, Output, ExpectedOutput, Evaluation], - self._evaluation_logic, - ).set_previous_run_output_ids([]) - return super().evaluate_runs( - *run_ids, num_examples=num_examples, abort_on_error=abort_on_error - ) diff --git a/src/intelligence_layer/evaluation/evaluation/evaluator/argilla_evaluator.py b/src/intelligence_layer/evaluation/evaluation/evaluator/argilla_evaluator.py new file mode 100644 index 000000000..991a67654 --- /dev/null +++ b/src/intelligence_layer/evaluation/evaluation/evaluator/argilla_evaluator.py @@ -0,0 +1,311 @@ +import random +from abc import ABC, abstractmethod +from datetime import datetime +from itertools import combinations +from typing import Mapping, Optional, Sequence + +from pydantic import BaseModel + +from intelligence_layer.connectors.argilla.argilla_client import ( + ArgillaClient, + ArgillaEvaluation, + Field, + Question, + RecordData, +) +from intelligence_layer.core import CompleteOutput, Input, InstructInput, Output +from intelligence_layer.evaluation.aggregation.elo import ( + ComparisonEvaluation, + MatchOutcome, +) +from intelligence_layer.evaluation.dataset.dataset_repository import DatasetRepository +from intelligence_layer.evaluation.dataset.domain import Example, ExpectedOutput +from intelligence_layer.evaluation.evaluation.domain import ( + Evaluation, + EvaluationOverview, + ExampleEvaluation, + FailedExampleEvaluation, + PartialEvaluationOverview, +) +from intelligence_layer.evaluation.evaluation.evaluator.async_evaluator import ( + AsyncEvaluationRepository, + AsyncEvaluator, +) +from intelligence_layer.evaluation.evaluation.evaluator.base_evaluator import ( + EvaluationLogicBase, +) +from intelligence_layer.evaluation.run.domain import SuccessfulExampleOutput +from intelligence_layer.evaluation.run.run_repository import RunRepository + + +class RecordDataSequence(BaseModel): + records: Sequence[RecordData] + + +class ArgillaEvaluationLogic( + EvaluationLogicBase[Input, Output, ExpectedOutput, Evaluation], ABC +): + def __init__(self, fields: Mapping[str, Field], questions: Sequence[Question]): + self.fields = fields + self.questions = questions + + @abstractmethod + def to_record( + self, + example: Example[Input, ExpectedOutput], + *output: SuccessfulExampleOutput[Output], + ) -> RecordDataSequence: + """This method is responsible for translating the `Example` and `Output` of the task to :class:`RecordData`. + + The specific format depends on the `fields`. + + + Args: + example: The example to be translated. + output: The output of the example that was run. + + Returns: + A :class:`RecordDataSequence` that contains entries that should be evaluated in Argilla. + """ + ... + + @abstractmethod + def from_record(self, argilla_evaluation: ArgillaEvaluation) -> Evaluation: + """This method takes the specific Argilla evaluation format and converts into a compatible :class:`Evaluation`. + + The format of argilla_evaluation.responses depends on the `questions` attribute. + Each `name` of a question will be a key in the `argilla_evaluation.responses` mapping. + + Args: + argilla_evaluation: Argilla-specific data for a single evaluation. + + Returns: + An :class:`Evaluation` that contains all evaluation specific data. + """ + + +class ArgillaEvaluator(AsyncEvaluator[Input, Output, ExpectedOutput, Evaluation]): + """Evaluator used to integrate with Argilla (https://github.com/argilla-io/argilla). + + Use this evaluator if you would like to easily do human eval. + This evaluator runs a dataset and sends the input and output to Argilla to be evaluated. + + Arguments: + dataset_repository: The repository with the examples that will be taken for the evaluation. + run_repository: The repository of the runs to evaluate. + evaluation_repository: The repository that will be used to store evaluation results. + description: Human-readable description for the evaluator. + evaluation_logic: The logic to use for evaluation. + argilla_client: The client to interface with argilla. + workspace_id: The argilla workspace id where datasets are created for evaluation. + + See the :class:`EvaluatorBase` for more information. + """ + + def __init__( + self, + dataset_repository: DatasetRepository, + run_repository: RunRepository, + evaluation_repository: AsyncEvaluationRepository, + description: str, + evaluation_logic: ArgillaEvaluationLogic[ + Input, Output, ExpectedOutput, Evaluation + ], + argilla_client: ArgillaClient, + workspace_id: str, + ) -> None: + super().__init__( + dataset_repository, + run_repository, + evaluation_repository, + description, + evaluation_logic, + ) + self._client = argilla_client + self._workspace_id = workspace_id + self._evaluation_logic: ArgillaEvaluationLogic[ + Input, Output, ExpectedOutput, Evaluation + ] + self._evaluation_repository: AsyncEvaluationRepository + + def submit( + self, + *run_ids: str, + num_examples: Optional[int] = None, + abort_on_error: bool = False, + ) -> PartialEvaluationOverview: + argilla_dataset_id = self._client.ensure_dataset_exists( + self._workspace_id, + dataset_name="name", + fields=list(self._evaluation_logic.fields.values()), + questions=self._evaluation_logic.questions, + ) + + run_overviews = self._load_run_overviews(*run_ids) + submit_count = 0 + for example, outputs in self._retrieve_eval_logic_input( + run_overviews, num_examples=num_examples + ): + record_sequence = self._evaluation_logic.to_record(example, *outputs) + for record in record_sequence.records: + try: + self._client.add_record(argilla_dataset_id, record) + submit_count += 1 + except Exception as e: + if abort_on_error: + raise e + evaluation = FailedExampleEvaluation.from_exception(e) + self._evaluation_repository.store_example_evaluation( + ExampleEvaluation( + evaluation_id=argilla_dataset_id, + example_id=example.id, + result=evaluation, + ) + ) + print( + f"Uploading a record to argilla failed with the following error:\n{e}" + ) + + partial_overview = PartialEvaluationOverview( + run_overviews=frozenset(run_overviews), + id=argilla_dataset_id, + start_date=datetime.now(), + submitted_evaluation_count=submit_count, + description=self.description, + ) + + self._evaluation_repository.store_partial_evaluation_overview(partial_overview) + return partial_overview + + def retrieve( + self, + partial_evaluation_id: str, + ) -> EvaluationOverview: + partial_overview = self._evaluation_repository.partial_evaluation_overview( + partial_evaluation_id + ) + if not partial_overview: + raise ValueError( + f"Partial overview for evaluation id {partial_evaluation_id} not found." + ) + + example_evaluations = [ + ExampleEvaluation( + evaluation_id=partial_evaluation_id, + example_id=example_evaluation.example_id, + # cast to Evaluation because mypy thinks ArgillaEvaluation cannot be Evaluation + result=self._evaluation_logic.from_record(example_evaluation), + ) + for example_evaluation in self._client.evaluations(partial_evaluation_id) + ] + evaluations = sorted(example_evaluations, key=lambda i: i.example_id) + + for evaluation in evaluations: + self._evaluation_repository.store_example_evaluation(evaluation) + num_failed_evaluations = len( + self._evaluation_repository.failed_example_evaluations( + partial_evaluation_id, self.evaluation_type() + ) + ) + num_not_yet_evaluated_evals = partial_overview.submitted_evaluation_count - len( + evaluations + ) + + overview = EvaluationOverview( + run_overviews=partial_overview.run_overviews, + id=partial_evaluation_id, + start_date=partial_overview.start_date, + description=partial_overview.description, + end_date=datetime.now(), + successful_evaluation_count=len(evaluations), + failed_evaluation_count=num_not_yet_evaluated_evals + + num_failed_evaluations, + ) + self._evaluation_repository.store_evaluation_overview(overview) + return overview + + +class InstructComparisonArgillaEvaluationLogic( + ArgillaEvaluationLogic[InstructInput, CompleteOutput, None, ComparisonEvaluation] +): + KEY_INSTRUCTION = "instruction" + KEY_INPUT = "input" + KEY_RESPONSE_1 = "first" + KEY_RESPONSE_2 = "second" + KEY_QUESTION = "winner" + OPTIONS = [1, 2, 3] + + def __init__( + self, + high_priority_runs: Optional[frozenset[str]] = None, + ) -> None: + self._high_priority_runs = high_priority_runs + super().__init__( + fields={ + "KEY_INSTRUCTION": Field( + name=self.KEY_INSTRUCTION, title="Instruction" + ), + "KEY_INPUT": Field(name=self.KEY_INPUT, title="Input"), + "KEY_RESPONSE_1": Field(name=self.KEY_RESPONSE_1, title="Response 1"), + "KEY_RESPONSE_2": Field(name=self.KEY_RESPONSE_2, title="Response 2"), + }, + questions=[ + Question( + name=self.KEY_QUESTION, + title="Which response is better?", + description="1: The first completion is better.\n2: The second completion is better.\n3: They are both equally good.", + options=self.OPTIONS, + ) + ], + ) + + def to_record( + self, + example: Example[InstructInput, None], + *outputs: SuccessfulExampleOutput[CompleteOutput], + ) -> RecordDataSequence: + pairs = combinations(outputs, 2) + return RecordDataSequence( + records=[ + self._create_record_data(example, first, second) + for [first, second] in pairs + if self._high_priority_runs is None + or any( + run_id in self._high_priority_runs + for run_id in [first.run_id, second.run_id] + ) + ] + ) + + def _create_record_data( + self, + example: Example[InstructInput, None], + first: SuccessfulExampleOutput[CompleteOutput], + second: SuccessfulExampleOutput[CompleteOutput], + ) -> RecordData: + if random.choice([True, False]): + first, second = second, first + return RecordData( + content={ + self.fields["KEY_INSTRUCTION"].name: example.input.instruction, + self.fields["KEY_INPUT"].name: example.input.input or "", + self.fields["KEY_RESPONSE_1"].name: first.output.completion, + self.fields["KEY_RESPONSE_2"].name: second.output.completion, + }, + example_id=example.id, + metadata={ + self.fields["KEY_RESPONSE_1"].name: first.run_id, + self.fields["KEY_RESPONSE_2"].name: second.run_id, + }, + ) + + def from_record( + self, argilla_evaluation: ArgillaEvaluation + ) -> ComparisonEvaluation: + return ComparisonEvaluation( + first=argilla_evaluation.metadata["first"], + second=argilla_evaluation.metadata["second"], + winner=MatchOutcome.from_rank_literal( + int(argilla_evaluation.responses["winner"]) + ), + ) diff --git a/src/intelligence_layer/evaluation/evaluation/evaluator/async_evaluator.py b/src/intelligence_layer/evaluation/evaluation/evaluator/async_evaluator.py new file mode 100644 index 000000000..8c280498e --- /dev/null +++ b/src/intelligence_layer/evaluation/evaluation/evaluator/async_evaluator.py @@ -0,0 +1,108 @@ +from abc import ABC, abstractmethod +from typing import Iterable, Optional, Sequence + +from intelligence_layer.core.task import Input, Output +from intelligence_layer.evaluation.dataset.domain import ExpectedOutput +from intelligence_layer.evaluation.evaluation.domain import ( + Evaluation, + EvaluationOverview, + PartialEvaluationOverview, +) +from intelligence_layer.evaluation.evaluation.evaluation_repository import ( + EvaluationRepository, +) +from intelligence_layer.evaluation.evaluation.evaluator.base_evaluator import ( + EvaluatorBase, +) + + +class AsyncEvaluator(EvaluatorBase[Input, Output, ExpectedOutput, Evaluation], ABC): + @abstractmethod + def submit( + self, + *run_ids: str, + num_examples: Optional[int] = None, + abort_on_error: bool = False, + ) -> PartialEvaluationOverview: + """Submits evaluations to external service to be evaluated. + + Failed submissions are saved as FailedExampleEvaluations. + + Args: + run_ids: The runs to be evaluated. Each run is expected to have the same + dataset as input (which implies their tasks have the same input-type) + and their tasks have the same output-type. For each example in the + dataset referenced by the runs the outputs of all runs are collected + and if all of them were successful they are passed on to the implementation + specific evaluation. The method compares all run of the provided ids to each other. + num_examples: The number of examples which should be evaluated from the given runs. + Always the first n runs stored in the evaluation repository. Defaults to None. + abort_on_error: Abort the whole submission process if a single submission fails. + Defaults to False. + + Returns: + A :class:`PartialEvaluationOverview` containing submission information. + """ + ... + + @abstractmethod + def retrieve(self, partial_overview_id: str) -> EvaluationOverview: + """Retrieves external evaluations and saves them to an evaluation repository. + + Failed or skipped submissions should be viewed as failed evaluations. + Evaluations that are submitted but not yet evaluated also count as failed evaluations. + + Args: + partial_overview_id: The id of the corresponding :class:`PartialEvaluationOverview`. + + Returns: + An :class:`EvaluationOverview` that describes the whole evaluation. + """ + ... + + +class AsyncEvaluationRepository(EvaluationRepository): + @abstractmethod + def store_partial_evaluation_overview( + self, partial_evaluation_overview: PartialEvaluationOverview + ) -> None: + """Stores an :class:`PartialEvaluationOverview`. + + Args: + partial_evaluation_overview: The partial overview to be persisted. + """ + ... + + @abstractmethod + def partial_evaluation_overview( + self, partial_evaluation_id: str + ) -> Optional[PartialEvaluationOverview]: + """Returns an :class:`PartialEvaluationOverview` for the given ID. + + Args: + partial_evaluation_id: ID of the partial evaluation overview to retrieve. + + Returns: + :class:`PartialEvaluationOverview` if it was found, `None` otherwise. + """ + ... + + def partial_evaluation_overviews(self) -> Iterable[PartialEvaluationOverview]: + """Returns all :class:`PartialEvaluationOverview`s sorted by their ID. + + Returns: + :class:`Iterable` of :class:`PartialEvaluationOverview`s. + """ + for eval_id in self.partial_evaluation_overview_ids(): + evaluation_overview = self.partial_evaluation_overview(eval_id) + if evaluation_overview is not None: + yield evaluation_overview + + @abstractmethod + def partial_evaluation_overview_ids(self) -> Sequence[str]: + """Returns sorted IDs of all stored :class:`PartialEvaluationOverview`s. + + Returns: + A :class:`Sequence` of the :class:`PartialEvaluationOverview` IDs. + """ + ... diff --git a/src/intelligence_layer/evaluation/evaluation/evaluator/base_evaluator.py b/src/intelligence_layer/evaluation/evaluation/evaluator/base_evaluator.py new file mode 100644 index 000000000..875642c07 --- /dev/null +++ b/src/intelligence_layer/evaluation/evaluation/evaluator/base_evaluator.py @@ -0,0 +1,376 @@ +from abc import ABC +from functools import lru_cache +from typing import ( + Generic, + Iterable, + Mapping, + Optional, + Sequence, + Tuple, + TypeVar, + cast, + get_args, + get_origin, +) + +from intelligence_layer.core import Input, Output +from intelligence_layer.evaluation.dataset.dataset_repository import DatasetRepository +from intelligence_layer.evaluation.dataset.domain import Example, ExpectedOutput +from intelligence_layer.evaluation.evaluation.domain import Evaluation +from intelligence_layer.evaluation.evaluation.evaluation_repository import ( + EvaluationRepository, +) +from intelligence_layer.evaluation.infrastructure.repository_navigator import ( + EvaluationLineage, + RepositoryNavigator, +) +from intelligence_layer.evaluation.run.domain import ( + ExampleOutput, + FailedExampleRun, + RunOverview, + SuccessfulExampleOutput, +) +from intelligence_layer.evaluation.run.run_repository import RunRepository + + +class EvaluationLogicBase(Generic[Input, Output, ExpectedOutput, Evaluation]): + pass + + +class EvaluatorBase(Generic[Input, Output, ExpectedOutput, Evaluation], ABC): + """Base class for Evaluators that can handle automatic evaluation scenarios. + + Provides methods for type inference and loading data from the repositories. + + Arguments: + dataset_repository: The repository with the examples that will be taken for the evaluation. + run_repository: The repository of the runs to evaluate. + evaluation_repository: The repository that will be used to store evaluation results. + description: Human-readable description for the evaluator. + evaluation_logic: The logic to use for evaluation. + + Generics: + Input: Type to be passed to the :class:`Task` as input. Part of `Example`. + Output: Type of the output of the :class:`Task` to be evaluated. Part of `ExampleOutput` + ExpectedOutput: Type that the `Output` will be compared against. Part of `Example`. + Evaluation: Type of the metrics that come from the evaluated :class:`Task`. Part of `ExampleEvaluation` + """ + + def __init__( + self, + dataset_repository: DatasetRepository, + run_repository: RunRepository, + evaluation_repository: EvaluationRepository, + description: str, + evaluation_logic: EvaluationLogicBase[ + Input, Output, ExpectedOutput, Evaluation + ], + ) -> None: + self._dataset_repository = dataset_repository + self._run_repository = run_repository + self._evaluation_repository = evaluation_repository + self._evaluation_logic = evaluation_logic + + self.description = description + + @lru_cache(maxsize=1) + def _get_types(self) -> Mapping[str, type]: + """Type magic function that gets the actual types of the generic parameters. + + Traverses the inheritance history of `EvaluationLogicBase`-subclasses to find an actual type every time a TypeVar is replaced. + + Returns: + Name of generic parameter to the type found. + """ + + def is_eligible_subclass(parent: type) -> bool: + return hasattr(parent, "__orig_bases__") and issubclass( + parent, EvaluationLogicBase + ) + + def update_types() -> None: + num_types_set = 0 + for current_index, current_type in enumerate(current_types): + if type(current_type) is not TypeVar: + type_var_count = num_types_set - 1 + for element_index, element in enumerate(type_list): + if type(element) is TypeVar: + type_var_count += 1 + if type_var_count == current_index: + break + assert type_var_count == current_index + type_list[element_index] = current_type + num_types_set += 1 + + # mypy does not know __orig_bases__ + base_types = EvaluationLogicBase.__orig_bases__[0] # type: ignore + type_list: list[type | TypeVar] = list(get_args(base_types)) + possible_parent_classes = [ + p + for p in reversed(type(self._evaluation_logic).__mro__) + if is_eligible_subclass(p) + ] + for parent in possible_parent_classes: + # mypy does not know __orig_bases__ + for base in parent.__orig_bases__: # type: ignore + origin = get_origin(base) + if origin is None or not issubclass(origin, EvaluationLogicBase): + continue + current_types = list(get_args(base)) + update_types() + + return { + name: param_type + for name, param_type in zip( + (a.__name__ for a in get_args(base_types)), type_list + ) + if type(param_type) is not TypeVar + } + + def input_type(self) -> type[Input]: + """Returns the type of the evaluated task's input. + + This can be used to retrieve properly typed :class:`Example`s of a dataset + from a :class:`DatasetRepository`. + + Returns: + The type of the evaluated task's input. + """ + try: + input_type = self._get_types()["Input"] + except KeyError: + raise TypeError(f"Alternatively overwrite input_type() in {type(self)}") + return cast(type[Input], input_type) + + def output_type(self) -> type[Output]: + """Returns the type of the evaluated task's output. + + This can be used to retrieve properly typed outputs of an evaluation run + from a :class:`RunRepository`. + + Returns: + The type of the evaluated task's output. + """ + try: + output_type = self._get_types()["Output"] + except KeyError: + raise TypeError(f"Alternatively overwrite output_type() in {type(self)}") + return cast(type[Output], output_type) + + def expected_output_type(self) -> type[ExpectedOutput]: + """Returns the type of the evaluated task's expected output. + + This can be used to retrieve properly typed :class:`Example`s of a dataset + from a :class:`DatasetRepository`. + + Returns: + The type of the evaluated task's expected output. + """ + try: + expected_output_type = self._get_types()["ExpectedOutput"] + except KeyError: + raise TypeError( + f"Alternatively overwrite expected_output_type() in {type(self)}" + ) + return cast(type[ExpectedOutput], expected_output_type) + + def evaluation_type(self) -> type[Evaluation]: + """Returns the type of the evaluation result of an example. + + This can be used to retrieve properly typed evaluations of an evaluation run + from an :class:`EvaluationRepository` + + Returns: + Returns the type of the evaluation result of an example. + """ + try: + evaluation_type = self._get_types()["Evaluation"] + except KeyError: + raise TypeError( + f"Alternatively overwrite evaluation_type() in {type(self)}" + ) + return cast(type[Evaluation], evaluation_type) + + def _load_run_overviews(self, *run_ids: str) -> set[RunOverview]: + if not run_ids: + raise ValueError("At least one run-id needs to be provided") + run_overviews = set() + for run_id in run_ids: + run_overview = self._run_repository.run_overview(run_id) + if not run_overview: + raise ValueError(f"No RunOverview found for run-id: {run_id}") + run_overviews.add(run_overview) + return run_overviews + + def _raise_if_overviews_have_different_dataset( + self, run_overviews: set[RunOverview] + ) -> None: + if not all( + next(iter(run_overviews)).dataset_id == run_overview.dataset_id + for run_overview in run_overviews + ): + raise ValueError( + f"All run-overviews must reference the same dataset: {run_overviews}" + ) + + def _retrieve_example_outputs( + self, run_overviews: set[RunOverview] + ) -> Iterable[tuple[ExampleOutput[Output], ...]]: + # this uses the assumption that the example outputs are sorted and there is never a missing example + example_outputs_for_example: Iterable[tuple[ExampleOutput[Output], ...]] = zip( + *( + self._run_repository.example_outputs( + run_overview.id, self.output_type() + ) + for run_overview in run_overviews + ), + strict=True, + ) + + return example_outputs_for_example + + def _retrieve_examples( + self, dataset_id: str + ) -> Iterable[Example[Input, ExpectedOutput]]: + examples = self._dataset_repository.examples( + dataset_id, + self.input_type(), + self.expected_output_type(), + ) + if examples is None: + raise ValueError(f"Dataset: {dataset_id} not found") + + return examples + + def _generate_evaluation_inputs( + self, + examples: Iterable[Example[Input, ExpectedOutput]], + example_outputs_for_example: Iterable[tuple[ExampleOutput[Output], ...]], + num_examples: Optional[int], + ) -> Iterable[ + Tuple[ + Example[Input, ExpectedOutput], + Sequence[SuccessfulExampleOutput[Output]], + ] + ]: + current_example = 0 + + for example, example_outputs in zip(examples, example_outputs_for_example): + if any( + isinstance(output.output, FailedExampleRun) + for output in example_outputs + ): + continue + + successful_example_outputs = [ + cast(SuccessfulExampleOutput[Output], output) + for output in example_outputs + ] + + if num_examples and current_example >= num_examples: + break + current_example += 1 + + yield ( + example, + successful_example_outputs, + ) + + def _retrieve_eval_logic_input( + self, + run_overviews: set[RunOverview], + num_examples: Optional[int] = None, + ) -> Iterable[ + Tuple[ + Example[Input, ExpectedOutput], + Sequence[SuccessfulExampleOutput[Output]], + ] + ]: + """Create pairings of :class:`Example` and all corresponding :class:`ExampleOutputs`. + + In case an Example is matched with a FailedExampleRun, that example is skipped, even if + there are other successful ExampleOutputs present for this example. + + Args: + run_overviews: Run overviews to gather data from. + num_examples: Maximum amount of examples to gather. Defaults to None. + + Returns: + Iterable over pairs of :class:`Example` and all corresponding :class:`ExampleOutputs`. + """ + self._raise_if_overviews_have_different_dataset(run_overviews) + example_outputs_for_example = self._retrieve_example_outputs(run_overviews) + dataset_id = next(iter(run_overviews)).dataset_id + examples = self._retrieve_examples(dataset_id) + return self._generate_evaluation_inputs( + examples, example_outputs_for_example, num_examples + ) + + def failed_evaluations( + self, evaluation_id: str + ) -> Iterable[EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]]: + """Returns the `EvaluationLineage` objects for all failed example evalations that belong to the given evaluation ID. + + Args: + evaluation_id: The ID of the evaluation overview + + Returns: + :class:`Iterable` of :class:`EvaluationLineage`s. + """ + failed_example_evaluations = ( + self._evaluation_repository.failed_example_evaluations( + evaluation_id, evaluation_type=self.evaluation_type() + ) + ) + lineages = ( + self.evaluation_lineage(evaluation_id, output.example_id) + for output in failed_example_evaluations + ) + return (lineage for lineage in lineages if lineage is not None) + + def evaluation_lineages( + self, evaluation_id: str + ) -> Iterable[EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]]: + """Wrapper for `RepositoryNagivator.evaluation_lineages`. + + Args: + evaluation_id: The id of the evaluation + + Returns: + An iterator over all :class:`EvaluationLineage`s for the given evaluation id. + """ + navigator = RepositoryNavigator( + self._dataset_repository, self._run_repository, self._evaluation_repository + ) + return navigator.evaluation_lineages( + evaluation_id=evaluation_id, + input_type=self.input_type(), + expected_output_type=self.expected_output_type(), + output_type=self.output_type(), + evaluation_type=self.evaluation_type(), + ) + + def evaluation_lineage( + self, evaluation_id: str, example_id: str + ) -> EvaluationLineage[Input, ExpectedOutput, Output, Evaluation] | None: + """Wrapper for `RepositoryNagivator.evaluation_lineage`. + + Args: + evaluation_id: The id of the evaluation + example_id: The id of the example of interest + + Returns: + The :class:`EvaluationLineage` for the given evaluation id and example id. + Returns `None` if the lineage is not complete because either an example, a run, or an evaluation does not exist. + """ + navigator = RepositoryNavigator( + self._dataset_repository, self._run_repository, self._evaluation_repository + ) + return navigator.evaluation_lineage( + evaluation_id=evaluation_id, + example_id=example_id, + input_type=self.input_type(), + expected_output_type=self.expected_output_type(), + output_type=self.output_type(), + evaluation_type=self.evaluation_type(), + ) diff --git a/src/intelligence_layer/evaluation/evaluation/evaluator/evaluator.py b/src/intelligence_layer/evaluation/evaluation/evaluator/evaluator.py new file mode 100644 index 000000000..b22c21660 --- /dev/null +++ b/src/intelligence_layer/evaluation/evaluation/evaluator/evaluator.py @@ -0,0 +1,190 @@ +from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor +from typing import Optional, final + +from tqdm import tqdm + +from intelligence_layer.core import Input, Output, utc_now +from intelligence_layer.evaluation.dataset.dataset_repository import DatasetRepository +from intelligence_layer.evaluation.dataset.domain import Example, ExpectedOutput +from intelligence_layer.evaluation.evaluation.domain import ( + Evaluation, + EvaluationOverview, + ExampleEvaluation, + FailedExampleEvaluation, +) +from intelligence_layer.evaluation.evaluation.evaluation_repository import ( + EvaluationRepository, +) +from intelligence_layer.evaluation.evaluation.evaluator.base_evaluator import ( + EvaluationLogicBase, + EvaluatorBase, +) +from intelligence_layer.evaluation.run.domain import SuccessfulExampleOutput +from intelligence_layer.evaluation.run.run_repository import RunRepository + + +class EvaluationLogic( + ABC, EvaluationLogicBase[Input, Output, ExpectedOutput, Evaluation] +): + @abstractmethod + def do_evaluate( + self, + example: Example[Input, ExpectedOutput], + *output: SuccessfulExampleOutput[Output], + ) -> Evaluation: + """Executes the evaluation for this specific example. + + Responsible for comparing the input & expected output of a task to the + actually generated output. + + Args: + example: Input data of :class:`Task` to produce the output. + output: Output of the :class:`Task`. + + Returns: + The metrics that come from the evaluated :class:`Task`. + """ + pass + + +class SingleOutputEvaluationLogic( + EvaluationLogic[Input, Output, ExpectedOutput, Evaluation] +): + @final + def do_evaluate( + self, + example: Example[Input, ExpectedOutput], + *output: SuccessfulExampleOutput[Output], + ) -> Evaluation: + assert len(output) == 1 + return self.do_evaluate_single_output(example, output[0].output) + + @abstractmethod + def do_evaluate_single_output( + self, example: Example[Input, ExpectedOutput], output: Output + ) -> Evaluation: + pass + + +class Evaluator(EvaluatorBase[Input, Output, ExpectedOutput, Evaluation]): + """Evaluator designed for most evaluation tasks. Only supports synchronous evaluation. + + See the :class:`EvaluatorBase` for more information. + """ + + def __init__( + self, + dataset_repository: DatasetRepository, + run_repository: RunRepository, + evaluation_repository: EvaluationRepository, + description: str, + evaluation_logic: EvaluationLogic[Input, Output, ExpectedOutput, Evaluation], + ) -> None: + super().__init__( + dataset_repository, + run_repository, + evaluation_repository, + description, + evaluation_logic, + ) + self._evaluation_logic: EvaluationLogic[ + Input, Output, ExpectedOutput, Evaluation + ] + + def evaluate_runs( + self, + *run_ids: str, + num_examples: Optional[int] = None, + abort_on_error: bool = False, + ) -> EvaluationOverview: + """Evaluates all generated outputs in the run. + + For each set of successful outputs in the referenced runs, + :func:`EvaluationLogic.do_evaluate` is called and eval metrics are produced & + stored in the provided :class:`EvaluationRepository`. + + Args: + run_ids: The runs to be evaluated. Each run is expected to have the same + dataset as input (which implies their tasks have the same input-type) + and their tasks have the same output-type. For each example in the + dataset referenced by the runs the outputs of all runs are collected + and if all of them were successful they are passed on to the implementation + specific evaluation. The method compares all run of the provided ids to each other. + num_examples: The number of examples which should be evaluated from the given runs. + Always the first n runs stored in the evaluation repository. Defaults to None. + abort_on_error: Flag to abort all evaluations when an error occurs. Defaults to False. + + Returns: + EvaluationOverview: An overview of the evaluation. Individual :class:`Evaluation`s will not be + returned but instead stored in the :class:`EvaluationRepository` provided in the + __init__. + """ + + start = utc_now() + run_overviews = self._load_run_overviews(*run_ids) + eval_id = self._evaluation_repository.initialize_evaluation() + + with ThreadPoolExecutor(max_workers=10) as executor: + example_evaluations = list( # the list is needed to consume the iterator returned from the executor.map + tqdm( + executor.map( + lambda args: self.evaluate( + args[0], eval_id, abort_on_error, *args[1] + ), + self._retrieve_eval_logic_input( + run_overviews, num_examples=num_examples + ), + ), + desc="Evaluating", + ) + ) + + failed_evaluation_count = sum( + isinstance(example_evaluation, FailedExampleEvaluation) + for example_evaluation in example_evaluations + ) + + successful_evaluation_count = len(example_evaluations) - failed_evaluation_count + overview = EvaluationOverview( + run_overviews=frozenset(run_overviews), + id=eval_id, + start_date=start, + end_date=utc_now(), + successful_evaluation_count=successful_evaluation_count, + failed_evaluation_count=failed_evaluation_count, + description=self.description, + ) + self._evaluation_repository.store_evaluation_overview(overview) + + return overview + + @final + def evaluate( + self, + example: Example[Input, ExpectedOutput], + evaluation_id: str, + abort_on_error: bool, + *example_outputs: SuccessfulExampleOutput[Output], + ) -> Evaluation | FailedExampleEvaluation: + try: + result: Evaluation | FailedExampleEvaluation = ( + self._evaluation_logic.do_evaluate( + example, + *example_outputs, + ) + ) + except Exception as e: + if abort_on_error: + raise e + print( + f'FAILED EVALUATION: example "{example.id}", {type(e).__qualname__}: "{e}"' + ) + result = FailedExampleEvaluation.from_exception(e) + self._evaluation_repository.store_example_evaluation( + ExampleEvaluation( + evaluation_id=evaluation_id, example_id=example.id, result=result + ) + ) + + return result diff --git a/src/intelligence_layer/evaluation/evaluation/evaluator/incremental_evaluator.py b/src/intelligence_layer/evaluation/evaluation/evaluator/incremental_evaluator.py new file mode 100644 index 000000000..bcab4124a --- /dev/null +++ b/src/intelligence_layer/evaluation/evaluation/evaluator/incremental_evaluator.py @@ -0,0 +1,172 @@ +from abc import abstractmethod +from typing import Optional + +from intelligence_layer.core import Input, Output +from intelligence_layer.evaluation.dataset.dataset_repository import DatasetRepository +from intelligence_layer.evaluation.dataset.domain import Example, ExpectedOutput +from intelligence_layer.evaluation.evaluation.domain import ( + Evaluation, + EvaluationOverview, +) +from intelligence_layer.evaluation.evaluation.evaluation_repository import ( + EvaluationRepository, +) +from intelligence_layer.evaluation.evaluation.evaluator.evaluator import ( + EvaluationLogic, + Evaluator, +) +from intelligence_layer.evaluation.run.domain import SuccessfulExampleOutput +from intelligence_layer.evaluation.run.run_repository import RunRepository + + +class IncrementalEvaluationLogic( + EvaluationLogic[Input, Output, ExpectedOutput, Evaluation] +): + def __init__(self) -> None: + super().__init__() + self._previous_run_output_ids: list[set[str]] = [] + + def set_previous_run_output_ids( + self, previous_run_output_ids: list[set[str]] + ) -> None: + self._previous_run_output_ids = previous_run_output_ids + + def do_evaluate( + self, + example: Example[Input, ExpectedOutput], + *outputs: SuccessfulExampleOutput[Output], + ) -> Evaluation: + """Executes the evaluation for this specific example. + + Responsible for comparing the input & expected output of a task to the + actually generated output. The difference to the standard :class:`EvaluationLogic`'s `do_evaluate` is that + this method will separate already processed evaluation from new ones before handing them over to + `do_incremental_evaluate`. + + Args: + example: Input data of :class:`Task` to produce the output. + outputs: Outputs of the :class:`Task`. + + Returns: + :class:`Evaluation`: The metrics that come from the evaluated :class:`Task`. + """ + flattened_run_output_ids: set[str] = set() + evaluated_outputs = [] + for run_output_ids in self._previous_run_output_ids: + flattened_run_output_ids = flattened_run_output_ids.union(run_output_ids) + evaluated_outputs.append( + [output for output in outputs if output.run_id in run_output_ids] + ) + + new_outputs = [ + output + for output in outputs + if output.run_id not in flattened_run_output_ids + ] + return self.do_incremental_evaluate(example, new_outputs, evaluated_outputs) + + @abstractmethod + def do_incremental_evaluate( + self, + example: Example[Input, ExpectedOutput], + outputs: list[SuccessfulExampleOutput[Output]], + already_evaluated_outputs: list[list[SuccessfulExampleOutput[Output]]], + ) -> Evaluation: + pass + + +class IncrementalEvaluator(Evaluator[Input, Output, ExpectedOutput, Evaluation]): + """:class:`Evaluator` for evaluating additional runs on top of previous evaluations. Intended for use with :class:`IncrementalEvaluationLogic`. + + Args: + dataset_repository: The repository with the examples that will be taken for the evaluation. + run_repository: The repository of the runs to evaluate. + evaluation_repository: The repository that will be used to store evaluation results. + description: Human-readable description for the evaluator. + incremental_evaluation_logic: The logic to use for evaluation. + + Generics: + Input: Interface to be passed to the :class:`Task` that shall be evaluated. + Output: Type of the output of the :class:`Task` to be evaluated. + ExpectedOutput: Output that is expected from the run with the supplied input. + Evaluation: Interface of the metrics that come from the evaluated :class:`Task`. + """ + + def __init__( + self, + dataset_repository: DatasetRepository, + run_repository: RunRepository, + evaluation_repository: EvaluationRepository, + description: str, + incremental_evaluation_logic: IncrementalEvaluationLogic[ + Input, Output, ExpectedOutput, Evaluation + ], + ) -> None: + super().__init__( + dataset_repository=dataset_repository, + run_repository=run_repository, + evaluation_repository=evaluation_repository, + description=description, + evaluation_logic=incremental_evaluation_logic, + ) + self._evaluation_logic: IncrementalEvaluationLogic[ + Input, Output, ExpectedOutput, Evaluation + ] + + def evaluate_additional_runs( + self, + *run_ids: str, + previous_evaluation_ids: Optional[list[str]] = None, + num_examples: Optional[int] = None, + abort_on_error: bool = False, + ) -> EvaluationOverview: + """Evaluate all runs while considering which runs have already been evaluated according to `previous_evaluation_id`. + + For each set of successful outputs in the referenced runs, + :func:`EvaluationLogic.do_evaluate` is called and eval metrics are produced & + stored in the provided :class:`EvaluationRepository`. + + Args: + run_ids: The runs to be evaluated. Each run is expected to have the same + dataset as input (which implies their tasks have the same input-type) + and their tasks have the same output-type. For each example in the + dataset referenced by the runs the outputs of all runs are collected + and if all of them were successful they are passed on to the implementation + specific evaluation. The method compares all run of the provided ids to each other. + previous_evaluation_ids: IDs of previous evaluation to consider + num_examples: The number of examples which should be evaluated from the given runs. + Always the first n runs stored in the evaluation repository. Defaults to None. + abort_on_error: Flag to abort all evaluations when an error occurs. Defaults to False. + + Returns: + EvaluationOverview: An overview of the evaluation. Individual :class:`Evaluation`s will not be + returned but instead stored in the :class:`EvaluationRepository` provided in the + __init__. + """ + + previous_run_ids = [] + previous_evaluation_ids = previous_evaluation_ids or [] + + for previous_evaluation_id in previous_evaluation_ids: + prev_run_ids: set[str] = set() + lineages = self.evaluation_lineages(previous_evaluation_id) + for lineage in lineages: + for output in lineage.outputs: + prev_run_ids.add(output.run_id) + previous_run_ids.append(prev_run_ids) + + self._evaluation_logic.set_previous_run_output_ids(previous_run_ids) + return super().evaluate_runs( + *run_ids, num_examples=num_examples, abort_on_error=abort_on_error + ) + + def evaluate_runs( + self, + *run_ids: str, + num_examples: Optional[int] = None, + abort_on_error: bool = False, + ) -> EvaluationOverview: + self._evaluation_logic.set_previous_run_output_ids([]) + return super().evaluate_runs( + *run_ids, num_examples=num_examples, abort_on_error=abort_on_error + ) diff --git a/src/intelligence_layer/evaluation/evaluation/file_evaluation_repository.py b/src/intelligence_layer/evaluation/evaluation/file_evaluation_repository.py index a701d7efe..5d7f713c0 100644 --- a/src/intelligence_layer/evaluation/evaluation/file_evaluation_repository.py +++ b/src/intelligence_layer/evaluation/evaluation/file_evaluation_repository.py @@ -7,11 +7,15 @@ Evaluation, EvaluationOverview, ExampleEvaluation, + PartialEvaluationOverview, ) from intelligence_layer.evaluation.evaluation.evaluation_repository import ( EvaluationRepository, SerializedExampleEvaluation, ) +from intelligence_layer.evaluation.evaluation.evaluator.async_evaluator import ( + AsyncEvaluationRepository, +) from intelligence_layer.evaluation.infrastructure.file_system_based_repository import ( FileSystemBasedRepository, ) @@ -26,7 +30,8 @@ def store_evaluation_overview(self, overview: EvaluationOverview) -> None: overview.model_dump_json(indent=2), create_parents=True, ) - # initialize the correct folders + # initialize the evaluation directory to make sure the evaluation "exists", even + # if we did not store any examples in it self.mkdir(self._eval_directory(overview.id)) def evaluation_overview(self, evaluation_id: str) -> Optional[EvaluationOverview]: @@ -112,3 +117,36 @@ def __init__(self, root_directory: Path) -> None: @staticmethod def path_to_str(path: Path) -> str: return str(path) + + +class AsyncFileEvaluationRepository( + FileEvaluationRepository, AsyncEvaluationRepository +): + def store_partial_evaluation_overview( + self, overview: PartialEvaluationOverview + ) -> None: + self.write_utf8( + self._partial_evaluation_overview_path(overview.id), + overview.model_dump_json(indent=2), + create_parents=True, + ) + # initialize the evaluation directory to make sure the evaluation "exists", even + # if we did not store any examples in it + self.mkdir(self._eval_directory(overview.id)) + + def partial_evaluation_overview( + self, evaluation_id: str + ) -> Optional[PartialEvaluationOverview]: + file_path = self._partial_evaluation_overview_path(evaluation_id) + if not self.exists(file_path): + return None + + content = self.read_utf8(file_path) + return PartialEvaluationOverview.model_validate_json(content) + + def partial_evaluation_overview_ids(self) -> Sequence[str]: + return sorted(self.file_names(self._eval_root_directory(), "partial_json")) + + def _partial_evaluation_overview_path(self, evaluation_id: str) -> Path: + path = self._eval_directory(evaluation_id).with_suffix(".partial_json") + return path diff --git a/src/intelligence_layer/evaluation/evaluation/in_memory_evaluation_repository.py b/src/intelligence_layer/evaluation/evaluation/in_memory_evaluation_repository.py index 2cb6f567a..aa3979f06 100644 --- a/src/intelligence_layer/evaluation/evaluation/in_memory_evaluation_repository.py +++ b/src/intelligence_layer/evaluation/evaluation/in_memory_evaluation_repository.py @@ -7,10 +7,14 @@ Evaluation, EvaluationOverview, ExampleEvaluation, + PartialEvaluationOverview, ) from intelligence_layer.evaluation.evaluation.evaluation_repository import ( EvaluationRepository, ) +from intelligence_layer.evaluation.evaluation.evaluator.async_evaluator import ( + AsyncEvaluationRepository, +) class InMemoryEvaluationRepository(EvaluationRepository): @@ -61,3 +65,28 @@ def example_evaluations( for example_evaluation in self._example_evaluations[evaluation_id] ] return sorted(example_evaluations, key=lambda i: i.example_id) + + +class AsyncInMemoryEvaluationRepository( + AsyncEvaluationRepository, InMemoryEvaluationRepository +): + def __init__(self) -> None: + super().__init__() + self._partial_evaluation_overviews: dict[str, PartialEvaluationOverview] = ( + dict() + ) + + def store_partial_evaluation_overview( + self, overview: PartialEvaluationOverview + ) -> None: + self._partial_evaluation_overviews[overview.id] = overview + if overview.id not in self._example_evaluations.keys(): + self._example_evaluations[overview.id] = [] + + def partial_evaluation_overview( + self, evaluation_id: str + ) -> Optional[PartialEvaluationOverview]: + return self._partial_evaluation_overviews.get(evaluation_id, None) + + def partial_evaluation_overview_ids(self) -> Sequence[str]: + return sorted(list(self._partial_evaluation_overviews.keys())) diff --git a/src/intelligence_layer/evaluation/run_evaluation.py b/src/intelligence_layer/evaluation/run_evaluation.py index fa84b2e26..afb79f201 100644 --- a/src/intelligence_layer/evaluation/run_evaluation.py +++ b/src/intelligence_layer/evaluation/run_evaluation.py @@ -16,7 +16,7 @@ from intelligence_layer.evaluation.dataset.file_dataset_repository import ( FileDatasetRepository, ) -from intelligence_layer.evaluation.evaluation.evaluator import Evaluator +from intelligence_layer.evaluation.evaluation.evaluator.evaluator import Evaluator from intelligence_layer.evaluation.evaluation.file_evaluation_repository import ( FileEvaluationRepository, ) diff --git a/tests/conftest.py b/tests/conftest.py index 9016f142e..e4eb66ae7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,7 +4,6 @@ from aleph_alpha_client import Client, Image from dotenv import load_dotenv -from faker import Faker from pydantic import BaseModel from pytest import fixture @@ -20,6 +19,7 @@ ) from intelligence_layer.core import LuminousControlModel, NoOpTracer, Task, TaskSpan from intelligence_layer.evaluation import ( + AsyncInMemoryEvaluationRepository, InMemoryAggregationRepository, InMemoryDatasetRepository, InMemoryEvaluationRepository, @@ -110,26 +110,20 @@ def to_document(document_chunk: DocumentChunk) -> Document: class DummyStringInput(BaseModel): - input: str - - @classmethod - def any(cls) -> "DummyStringInput": - fake = Faker() - return cls(input=fake.text()) + input: str = "dummy-input" class DummyStringOutput(BaseModel): - output: str + output: str = "dummy-output" + - @classmethod - def any(cls) -> "DummyStringOutput": - fake = Faker() - return cls(output=fake.text()) +class DummyStringEvaluation(BaseModel): + evaluation: str = "dummy-evaluation" class DummyStringTask(Task[DummyStringInput, DummyStringOutput]): def do_run(self, input: DummyStringInput, task_span: TaskSpan) -> DummyStringOutput: - return DummyStringOutput.any() + return DummyStringOutput() @fixture @@ -155,3 +149,8 @@ def in_memory_evaluation_repository() -> InMemoryEvaluationRepository: @fixture def in_memory_aggregation_repository() -> InMemoryAggregationRepository: return InMemoryAggregationRepository() + + +@fixture() +def async_in_memory_evaluation_repository() -> AsyncInMemoryEvaluationRepository: + return AsyncInMemoryEvaluationRepository() diff --git a/tests/evaluation/conftest.py b/tests/evaluation/conftest.py index 8167d7c91..052351de5 100644 --- a/tests/evaluation/conftest.py +++ b/tests/evaluation/conftest.py @@ -28,7 +28,6 @@ FileRunRepository, InMemoryDatasetRepository, InMemoryRunRepository, - InstructComparisonArgillaAggregationLogic, Runner, RunOverview, ) @@ -135,7 +134,10 @@ def evaluation_overview( ) -> EvaluationOverview: return EvaluationOverview( id=evaluation_id, - start=utc_now(), + start_date=utc_now(), + end_date=utc_now(), + successful_evaluation_count=1, + failed_evaluation_count=1, run_overviews=frozenset([run_overview]), description="test evaluation overview 1", ) @@ -161,9 +163,7 @@ def aggregation_overview( @fixture def dummy_string_example() -> Example[DummyStringInput, DummyStringOutput]: - return Example( - input=DummyStringInput.any(), expected_output=DummyStringOutput.any() - ) + return Example(input=DummyStringInput(), expected_output=DummyStringOutput()) @fixture @@ -186,11 +186,6 @@ def dummy_runner( ) -@fixture -def argilla_aggregation_logic() -> InstructComparisonArgillaAggregationLogic: - return InstructComparisonArgillaAggregationLogic() - - class StubArgillaClient(ArgillaClient): _expected_workspace_id: str _expected_fields: Sequence[Field] @@ -201,7 +196,7 @@ class StubArgillaClient(ArgillaClient): def ensure_dataset_exists( self, workspace_id: str, - _: str, + dataset_name: str, fields: Sequence[Field], questions: Sequence[Question], ) -> str: diff --git a/tests/evaluation/test_argilla_evaluation_repository.py b/tests/evaluation/test_argilla_evaluation_repository.py deleted file mode 100644 index 60c1bf04d..000000000 --- a/tests/evaluation/test_argilla_evaluation_repository.py +++ /dev/null @@ -1,305 +0,0 @@ -from itertools import chain -from typing import Iterable, Sequence, Tuple -from uuid import uuid4 - -from pytest import fixture - -from intelligence_layer.connectors import ArgillaEvaluation, Field, Question, RecordData -from intelligence_layer.core import utc_now -from intelligence_layer.evaluation import ( - ArgillaEvaluationRepository, - EvaluationOverview, - ExampleEvaluation, - FailedExampleEvaluation, - InMemoryEvaluationRepository, - RecordDataSequence, - RunOverview, -) -from tests.evaluation.conftest import DummyEvaluation, StubArgillaClient - - -@fixture -def argilla_workspace_id() -> str: - return "workspace-id" - - -@fixture -def argilla_client_fields() -> Sequence[Field]: - return [] - - -@fixture -def argilla_client_questions() -> Sequence[Question]: - return [] - - -@fixture -def stub_argilla_client_with_defaults( - stub_argilla_client: StubArgillaClient, - argilla_workspace_id: str, - argilla_client_fields: Sequence[Field], - argilla_client_questions: Sequence[Question], -) -> StubArgillaClient: - stub_argilla_client._expected_workspace_id = argilla_workspace_id - stub_argilla_client._expected_fields = argilla_client_fields - stub_argilla_client._expected_questions = argilla_client_questions - return stub_argilla_client - - -@fixture -def argilla_evaluation_repository( - stub_argilla_client_with_defaults: StubArgillaClient, - in_memory_evaluation_repository: InMemoryEvaluationRepository, - argilla_workspace_id: str, - argilla_client_fields: Sequence[Field], - argilla_client_questions: Sequence[Question], -) -> ArgillaEvaluationRepository: - return ArgillaEvaluationRepository( - argilla_client=stub_argilla_client_with_defaults, - evaluation_repository=in_memory_evaluation_repository, - workspace_id=argilla_workspace_id, - fields=argilla_client_fields, - questions=argilla_client_questions, - ) - - -@fixture -def argilla_evaluation_repository_with_example_evaluations( - argilla_evaluation_repository: ArgillaEvaluationRepository, -) -> Tuple[ - str, - ArgillaEvaluationRepository, - list[ExampleEvaluation[RecordDataSequence]], - list[ExampleEvaluation[RecordDataSequence]], -]: - dataset_id = argilla_evaluation_repository.initialize_evaluation() - - successful_example_evaluation_ids = [str(uuid4()) for _ in range(10)] - - successful_example_evaluations = [] - for example_evaluation_id in successful_example_evaluation_ids: - example_evaluation = ExampleEvaluation( - evaluation_id=dataset_id, - example_id=example_evaluation_id, - result=RecordDataSequence( - records=[ - RecordData( - content={}, example_id=example_evaluation_id, metadata={} - ) - ] - ), - ) - successful_example_evaluations.append(example_evaluation) - argilla_evaluation_repository.store_example_evaluation(example_evaluation) - - failed_example_evaluation_ids = [str(uuid4()) for _ in range(10)] - failed_example_evaluations = [] - for example_evaluation_id in failed_example_evaluation_ids: - failed_example_evaluation: ExampleEvaluation[RecordDataSequence] = ( - ExampleEvaluation( - evaluation_id=dataset_id, - example_id=example_evaluation_id, - result=FailedExampleEvaluation(error_message="error"), - ) - ) - failed_example_evaluations.append(failed_example_evaluation) - ( - argilla_evaluation_repository.store_example_evaluation( - failed_example_evaluation - ) - ) - - return ( - dataset_id, - argilla_evaluation_repository, - successful_example_evaluations, - failed_example_evaluations, - ) - - -@fixture -def evaluation_overviews(run_overview: RunOverview) -> Iterable[EvaluationOverview]: - evaluation_overviews = [] - evaluation_ids = [str(uuid4()) for _ in range(10)] - for evaluation_id in evaluation_ids: - evaluation_overviews.append( - EvaluationOverview( - id=evaluation_id, - start=utc_now(), - run_overviews=frozenset([run_overview]), - description="test evaluation overview 1", - ) - ) - - return evaluation_overviews - - -def test_create_evaluation_dataset_returns_dataset_id( - argilla_evaluation_repository: ArgillaEvaluationRepository, -) -> None: - dataset_id = argilla_evaluation_repository.initialize_evaluation() - - assert dataset_id != "" - - -def test_evaluation_overview_returns_none_for_not_existing_id( - argilla_evaluation_repository: ArgillaEvaluationRepository, - evaluation_overview: EvaluationOverview, -) -> None: - argilla_evaluation_repository.store_evaluation_overview(evaluation_overview) - - stored_evaluation_overview = argilla_evaluation_repository.evaluation_overview( - "not-existing-id" - ) - - assert stored_evaluation_overview is None - - -def test_evaluation_overview_returns_evaluation_overview( - argilla_evaluation_repository: ArgillaEvaluationRepository, - evaluation_overview: EvaluationOverview, -) -> None: - argilla_evaluation_repository.store_evaluation_overview(evaluation_overview) - - stored_evaluation_overview = argilla_evaluation_repository.evaluation_overview( - evaluation_overview.id - ) - - assert stored_evaluation_overview == evaluation_overview - - -def test_evaluation_overviews_returns_sorted_evaluation_overviews( - argilla_evaluation_repository: ArgillaEvaluationRepository, - evaluation_overviews: Iterable[EvaluationOverview], -) -> None: - for evaluation_overview in evaluation_overviews: - argilla_evaluation_repository.store_evaluation_overview(evaluation_overview) - - stored_evaluation_overviews = list( - argilla_evaluation_repository.evaluation_overviews() - ) - - assert stored_evaluation_overviews == sorted( - evaluation_overviews, key=lambda overview: overview.id - ) - - -def test_evaluation_overview_ids_returns_sorted_ids( - argilla_evaluation_repository: ArgillaEvaluationRepository, - evaluation_overviews: Iterable[EvaluationOverview], -) -> None: - sorted_evaluation_overview_ids = sorted( - [overview.id for overview in evaluation_overviews] - ) - for evaluation_overview in evaluation_overviews: - argilla_evaluation_repository.store_evaluation_overview(evaluation_overview) - - evaluation_overview_ids = argilla_evaluation_repository.evaluation_overview_ids() - - assert evaluation_overview_ids == sorted_evaluation_overview_ids - - -def test_example_evaluations_returns_sorted_example_evaluations( - argilla_evaluation_repository: ArgillaEvaluationRepository, - argilla_evaluation_repository_with_example_evaluations: Tuple[ - str, - ArgillaEvaluationRepository, - list[ExampleEvaluation[DummyEvaluation]], - list[ExampleEvaluation[DummyEvaluation]], - ], -) -> None: - ( - dataset_id, - argilla_evaluation_repository, - successful_evaluation_examples, - failed_evaluation_examples, - ) = argilla_evaluation_repository_with_example_evaluations - all_sorted_evaluation_examples = sorted( - chain(successful_evaluation_examples, failed_evaluation_examples), - key=lambda example: example.example_id, - ) - - example_evaluations = argilla_evaluation_repository.example_evaluations( - dataset_id, ArgillaEvaluation - ) - - assert len(example_evaluations) == len(all_sorted_evaluation_examples) - for i, example_evaluation in enumerate(example_evaluations): - assert ( - example_evaluation.example_id - == all_sorted_evaluation_examples[i].example_id - ) - assert ( - example_evaluation.evaluation_id - == all_sorted_evaluation_examples[i].evaluation_id - ) - - -def test_successful_example_evaluations_returns_sorted_successful_example_evaluations( - argilla_evaluation_repository: ArgillaEvaluationRepository, - argilla_evaluation_repository_with_example_evaluations: Tuple[ - str, - ArgillaEvaluationRepository, - list[ExampleEvaluation[DummyEvaluation]], - list[ExampleEvaluation[DummyEvaluation]], - ], -) -> None: - ( - dataset_id, - argilla_evaluation_repository, - successful_evaluation_examples, - failed_evaluation_examples, - ) = argilla_evaluation_repository_with_example_evaluations - sorted_successful_evaluation_examples = sorted( - successful_evaluation_examples, key=lambda example: example.example_id - ) - - example_evaluations = argilla_evaluation_repository.successful_example_evaluations( - dataset_id, ArgillaEvaluation - ) - - assert len(example_evaluations) == len(sorted_successful_evaluation_examples) - for i, example_evaluation in enumerate(example_evaluations): - assert ( - example_evaluation.example_id - == sorted_successful_evaluation_examples[i].example_id - ) - assert ( - example_evaluation.evaluation_id - == sorted_successful_evaluation_examples[i].evaluation_id - ) - - -def test_failed_example_evaluations_returns_sorted_failed_example_evaluations( - argilla_evaluation_repository: ArgillaEvaluationRepository, - argilla_evaluation_repository_with_example_evaluations: Tuple[ - str, - ArgillaEvaluationRepository, - list[ExampleEvaluation[DummyEvaluation]], - list[ExampleEvaluation[DummyEvaluation]], - ], -) -> None: - ( - dataset_id, - argilla_evaluation_repository, - successful_evaluation_examples, - failed_evaluation_examples, - ) = argilla_evaluation_repository_with_example_evaluations - sorted_failed_evaluation_examples = sorted( - failed_evaluation_examples, key=lambda example: example.example_id - ) - - example_evaluations = argilla_evaluation_repository.failed_example_evaluations( - dataset_id, ArgillaEvaluation - ) - - assert len(example_evaluations) == len(sorted_failed_evaluation_examples) - for i, example_evaluation in enumerate(example_evaluations): - assert ( - example_evaluation.example_id - == sorted_failed_evaluation_examples[i].example_id - ) - assert ( - example_evaluation.evaluation_id - == sorted_failed_evaluation_examples[i].evaluation_id - ) diff --git a/tests/evaluation/test_argilla_evaluator.py b/tests/evaluation/test_argilla_evaluator.py index ecee6a7c3..ee4ea9d2b 100644 --- a/tests/evaluation/test_argilla_evaluator.py +++ b/tests/evaluation/test_argilla_evaluator.py @@ -1,46 +1,39 @@ import random -from typing import Iterable, Sequence, cast +from typing import Iterable, Sequence +from uuid import uuid4 +import pytest from pytest import fixture -from intelligence_layer.connectors import ArgillaEvaluation, Field, Question, RecordData +from intelligence_layer.connectors import ( + ArgillaClient, + ArgillaEvaluation, + Field, + Question, + RecordData, +) from intelligence_layer.evaluation import ( - AggregationLogic, - ArgillaAggregator, ArgillaEvaluationLogic, - ArgillaEvaluationRepository, ArgillaEvaluator, + AsyncInMemoryEvaluationRepository, + ComparisonAggregationLogic, + ComparisonEvaluation, + DatasetRepository, Example, - InMemoryAggregationRepository, InMemoryDatasetRepository, - InMemoryEvaluationRepository, InMemoryRunRepository, - InstructComparisonArgillaAggregationLogic, + MatchOutcome, RecordDataSequence, Runner, SuccessfulExampleOutput, ) -from tests.conftest import DummyStringInput, DummyStringOutput, DummyStringTask -from tests.evaluation.conftest import DummyAggregatedEvaluation, StubArgillaClient - - -class DummyStringTaskArgillaAggregationLogic( - AggregationLogic[ - ArgillaEvaluation, - DummyAggregatedEvaluation, - ] -): - def aggregate( - self, - evaluations: Iterable[ArgillaEvaluation], - ) -> DummyAggregatedEvaluation: - evaluations = list(evaluations) - total_human_score = sum( - cast(float, a.responses["human-score"]) for a in evaluations - ) - return DummyAggregatedEvaluation( - score=total_human_score / len(evaluations), - ) +from tests.conftest import ( + DummyStringEvaluation, + DummyStringInput, + DummyStringOutput, + DummyStringTask, +) +from tests.evaluation.conftest import StubArgillaClient class DummyStringTaskArgillaEvaluationLogic( @@ -48,9 +41,23 @@ class DummyStringTaskArgillaEvaluationLogic( DummyStringInput, DummyStringOutput, DummyStringOutput, + DummyStringEvaluation, ] ): - def _to_record( + def __init__(self) -> None: + super().__init__( + fields={ + "output": Field(name="output", title="Output"), + "input": Field(name="input", title="Input"), + }, + questions=[ + Question( + name="name", title="title", description="description", options=[0] + ) + ], + ) + + def to_record( self, example: Example[DummyStringInput, DummyStringOutput], *output: SuccessfulExampleOutput[DummyStringOutput], @@ -69,6 +76,93 @@ def _to_record( ] ) + def from_record( + self, argilla_evaluation: ArgillaEvaluation + ) -> DummyStringEvaluation: + return DummyStringEvaluation() + + +class CustomException(Exception): + pass + + +class DummyArgillaClient(ArgillaClient): + _datasets: dict[str, list[RecordData]] = {} + _score = 3.0 + + def ensure_dataset_exists( + self, + workspace_id: str, + dataset_name: str, + fields: Sequence[Field], + questions: Sequence[Question], + ) -> str: + dataset_id = str(uuid4()) + self._datasets[dataset_id] = [] + return dataset_id + + def add_record(self, dataset_id: str, record: RecordData) -> None: + if dataset_id not in self._datasets.keys(): + raise Exception("Add record: dataset not found") + self._datasets[dataset_id].append(record) + + def evaluations(self, dataset_id: str) -> Iterable[ArgillaEvaluation]: + dataset = self._datasets.get(dataset_id) + assert dataset + return [ + ArgillaEvaluation( + example_id=record.example_id, + record_id="ignored", + responses={"human-score": self._score}, + metadata=dict(), + ) + for record in dataset + ] + + def split_dataset(self, dataset_id: str, n_splits: int) -> None: + raise NotImplementedError + + +class FailedEvaluationDummyArgillaClient(ArgillaClient): + """fails on first upload, only returns 1 evaluated evaluation""" + + _upload_count = 0 + _datasets: dict[str, list[RecordData]] = {} + + def ensure_dataset_exists( + self, + workspace_id: str, + dataset_name: str, + fields: Sequence[Field], + questions: Sequence[Question], + ) -> str: + dataset_id = str(uuid4()) + self._datasets[dataset_id] = [] + + return dataset_id + + def add_record(self, dataset_id: str, record: RecordData) -> None: + if self._upload_count == 0: + self._upload_count += 1 + raise CustomException("First upload fails") + self._datasets[dataset_id].append(record) + + def evaluations(self, dataset_id: str) -> Iterable[ArgillaEvaluation]: + dataset = self._datasets.get(dataset_id) + assert dataset + record = dataset[0] + return [ + ArgillaEvaluation( + example_id=record.example_id, + record_id="ignored", + responses={"human-score": 0}, + metadata=dict(), + ) + ] + + def split_dataset(self, dataset_id: str, n_splits: int) -> None: + raise NotImplementedError + @fixture def arg() -> StubArgillaClient: @@ -95,62 +189,29 @@ def argilla_fields() -> Sequence[Field]: ] -@fixture -def argilla_evaluation_repository( - in_memory_evaluation_repository: InMemoryEvaluationRepository, - stub_argilla_client: StubArgillaClient, - argilla_questions: Sequence[Question], - argilla_fields: Sequence[Field], -) -> ArgillaEvaluationRepository: - stub_argilla_client._expected_workspace_id = "workspace-id" - stub_argilla_client._expected_questions = argilla_questions - stub_argilla_client._expected_fields = argilla_fields - - workspace_id = stub_argilla_client._expected_workspace_id - - return ArgillaEvaluationRepository( - in_memory_evaluation_repository, - stub_argilla_client, - workspace_id, - argilla_fields, - argilla_questions, - ) - - @fixture def string_argilla_evaluator( in_memory_dataset_repository: InMemoryDatasetRepository, in_memory_run_repository: InMemoryRunRepository, - argilla_evaluation_repository: ArgillaEvaluationRepository, + async_in_memory_evaluation_repository: AsyncInMemoryEvaluationRepository, ) -> ArgillaEvaluator[ DummyStringInput, DummyStringOutput, DummyStringOutput, + DummyStringEvaluation, ]: evaluator = ArgillaEvaluator( in_memory_dataset_repository, in_memory_run_repository, - argilla_evaluation_repository, + async_in_memory_evaluation_repository, "dummy-string-task", DummyStringTaskArgillaEvaluationLogic(), + StubArgillaClient(), + "workspace-id", ) return evaluator -@fixture -def string_argilla_aggregator( - argilla_evaluation_repository: ArgillaEvaluationRepository, - in_memory_aggregation_repository: InMemoryAggregationRepository, -) -> ArgillaAggregator[DummyAggregatedEvaluation]: - aggregator = ArgillaAggregator( - argilla_evaluation_repository, - in_memory_aggregation_repository, - "dummy-string-task", - DummyStringTaskArgillaAggregationLogic(), - ) - return aggregator - - @fixture def string_argilla_runner( dummy_string_task: DummyStringTask, @@ -165,71 +226,115 @@ def string_argilla_runner( ) -def test_argilla_evaluator_can_do_sync_evaluation( - string_argilla_evaluator: ArgillaEvaluator[ - DummyStringInput, - DummyStringOutput, - DummyStringOutput, - ], +def test_argilla_evaluator_can_submit_evals_to_argilla( string_argilla_runner: Runner[DummyStringInput, DummyStringOutput], string_dataset_id: str, + in_memory_dataset_repository: InMemoryDatasetRepository, + in_memory_run_repository: InMemoryRunRepository, + async_in_memory_evaluation_repository: AsyncInMemoryEvaluationRepository, ) -> None: - argilla_client = cast( - StubArgillaClient, - string_argilla_evaluator._evaluation_repository._client, # type: ignore + evaluator = ArgillaEvaluator( + in_memory_dataset_repository, + in_memory_run_repository, + async_in_memory_evaluation_repository, + "dummy-string-task", + DummyStringTaskArgillaEvaluationLogic(), + DummyArgillaClient(), + workspace_id="workspace-id", ) run_overview = string_argilla_runner.run_dataset(string_dataset_id) - eval_overview = string_argilla_evaluator.evaluate_runs(run_overview.id) - examples_iter = string_argilla_evaluator._dataset_repository.examples( - string_dataset_id, DummyStringInput, DummyStringOutput + + partial_evaluation_overview = evaluator.submit(run_overview.id) + assert partial_evaluation_overview.submitted_evaluation_count == 1 + + eval_overview = evaluator.retrieve(partial_evaluation_overview.id) + + assert eval_overview.end_date is not None + assert eval_overview.successful_evaluation_count == 1 + assert eval_overview.failed_evaluation_count == 0 + + assert ( + len( + async_in_memory_evaluation_repository.example_evaluations( + eval_overview.id, DummyStringOutput + ) + ) + == 1 ) - assert examples_iter is not None - assert eval_overview.id in argilla_client._datasets - saved_dataset = argilla_client._datasets[eval_overview.id] - examples = list(examples_iter) - assert len(saved_dataset) == len(examples) - assert saved_dataset[0].example_id == examples[0].id - assert saved_dataset[0].content["input"] == examples[0].input.input + assert len(list(async_in_memory_evaluation_repository.evaluation_overviews())) == 1 + assert len(DummyArgillaClient()._datasets[partial_evaluation_overview.id]) == 1 -def test_argilla_evaluator_can_aggregate_evaluation( - string_argilla_evaluator: ArgillaEvaluator[ - DummyStringInput, - DummyStringOutput, - DummyStringOutput, - ], +def test_argilla_evaluator_correctly_lists_failed_eval_counts( + dummy_string_example: Example[DummyStringInput, DummyStringOutput], + in_memory_dataset_repository: DatasetRepository, + in_memory_run_repository: InMemoryRunRepository, + async_in_memory_evaluation_repository: AsyncInMemoryEvaluationRepository, string_argilla_runner: Runner[DummyStringInput, DummyStringOutput], - string_dataset_id: str, - string_argilla_aggregator: ArgillaAggregator[DummyAggregatedEvaluation], ) -> None: - # given - argilla_client = cast( - StubArgillaClient, - string_argilla_evaluator._evaluation_repository._client, # type: ignore + dataset_id = in_memory_dataset_repository.create_dataset( + examples=[dummy_string_example] * 3, dataset_name="test-dataset" + ).id + run_overview = string_argilla_runner.run_dataset(dataset_id) + + evaluator = ArgillaEvaluator( + in_memory_dataset_repository, + in_memory_run_repository, + async_in_memory_evaluation_repository, + "dummy-string-task", + DummyStringTaskArgillaEvaluationLogic(), + FailedEvaluationDummyArgillaClient(), + workspace_id="workspace-id", ) - # when + + partial_evaluation_overview = evaluator.submit(run_overview.id) + assert ( + len( + async_in_memory_evaluation_repository.failed_example_evaluations( + partial_evaluation_overview.id, DummyStringEvaluation + ) + ) + == 1 + ) + eval_overview = evaluator.retrieve(partial_evaluation_overview.id) + + assert eval_overview.successful_evaluation_count == 1 + assert eval_overview.failed_evaluation_count == 2 + + +def test_argilla_evaluator_abort_on_error_works( + string_argilla_runner: Runner[DummyStringInput, DummyStringOutput], + string_dataset_id: str, + in_memory_dataset_repository: DatasetRepository, + in_memory_run_repository: InMemoryRunRepository, + async_in_memory_evaluation_repository: AsyncInMemoryEvaluationRepository, +) -> None: run_overview = string_argilla_runner.run_dataset(string_dataset_id) - eval_overview = string_argilla_evaluator.evaluate_runs(run_overview.id) - aggregated_eval_overview = string_argilla_aggregator.aggregate_evaluation( - eval_overview.id + + evaluator = ArgillaEvaluator( + in_memory_dataset_repository, + in_memory_run_repository, + async_in_memory_evaluation_repository, + "dummy-string-task", + DummyStringTaskArgillaEvaluationLogic(), + FailedEvaluationDummyArgillaClient(), + workspace_id="workspace-id", ) - # then - assert aggregated_eval_overview.statistics.score == argilla_client._score + with pytest.raises(CustomException): + evaluator.submit(run_overview.id, abort_on_error=True) def test_argilla_aggregation_logic_works() -> None: - argilla_aggregation_logic = InstructComparisonArgillaAggregationLogic() + argilla_aggregation_logic = ComparisonAggregationLogic() evaluations = ( - ArgillaEvaluation( - example_id=str(i), - record_id=str(i), - responses={"winner": random.choices([1, 2, 3], [0.5, 0.25, 0.25], k=1)[0]}, - metadata={ - "first": "player_1", - "second": "player_2" if i < 9000 else "player_3", - }, + ComparisonEvaluation( + first="player_1", + second="player_2" if i < 9000 else "player_3", + winner=MatchOutcome.from_rank_literal( + random.choices([1, 2, 3], [0.5, 0.25, 0.25], k=1)[0] + ), ) for i in range(10000) ) diff --git a/tests/evaluation/test_async_evaluation_repository.py b/tests/evaluation/test_async_evaluation_repository.py new file mode 100644 index 000000000..b3b92f169 --- /dev/null +++ b/tests/evaluation/test_async_evaluation_repository.py @@ -0,0 +1,186 @@ +from pathlib import Path +from typing import Iterable +from uuid import uuid4 + +from pytest import FixtureRequest, fixture, mark + +from intelligence_layer.core.tracer.tracer import utc_now +from intelligence_layer.evaluation import ( + AsyncEvaluationRepository, + AsyncFileEvaluationRepository, + RunOverview, +) +from intelligence_layer.evaluation.evaluation.domain import ( + EvaluationOverview, + PartialEvaluationOverview, +) + + +@fixture +def async_file_evaluation_repository(tmp_path: Path) -> AsyncFileEvaluationRepository: + return AsyncFileEvaluationRepository(tmp_path) + + +test_repository_fixtures = [ + "async_file_evaluation_repository", + "async_in_memory_evaluation_repository", +] + + +@fixture +def partial_evaluation_overviews( + run_overview: RunOverview, +) -> Iterable[PartialEvaluationOverview]: + evaluation_ids = [str(uuid4()) for _ in range(10)] + evaluation_overviews = [] + for evaluation_id in evaluation_ids: + evaluation_overviews.append( + PartialEvaluationOverview( + id=evaluation_id, + start_date=utc_now(), + run_overviews=frozenset([run_overview]), + submitted_evaluation_count=10, + description="test evaluation overview", + ) + ) + return evaluation_overviews + + +@fixture +def partial_evaluation_overview( + evaluation_id: str, run_overview: RunOverview +) -> PartialEvaluationOverview: + return PartialEvaluationOverview( + id=evaluation_id, + start_date=utc_now(), + run_overviews=frozenset([run_overview]), + submitted_evaluation_count=10, + description="test evaluation overview", + ) + + +@mark.parametrize( + "repository_fixture", + test_repository_fixtures, +) +def test_store_partial_evaluation_overview_stores_and_returns_given_evaluation_overview( + repository_fixture: str, + request: FixtureRequest, + partial_evaluation_overview: PartialEvaluationOverview, +) -> None: + evaluation_repository: AsyncEvaluationRepository = request.getfixturevalue( + repository_fixture + ) + + evaluation_repository.store_partial_evaluation_overview(partial_evaluation_overview) + retrieved_evaluation_overview = evaluation_repository.partial_evaluation_overview( + partial_evaluation_overview.id + ) + + assert retrieved_evaluation_overview == partial_evaluation_overview + + +@mark.parametrize( + "repository_fixture", + test_repository_fixtures, +) +def test_partial_evaluation_overview_returns_none_for_a_not_existing_overview_id( + repository_fixture: str, + request: FixtureRequest, +) -> None: + evaluation_repository: AsyncEvaluationRepository = request.getfixturevalue( + repository_fixture + ) + + evaluation_overview = evaluation_repository.partial_evaluation_overview( + "not-existing-evaluation-id" + ) + + assert evaluation_overview is None + + +@mark.parametrize( + "repository_fixture", + test_repository_fixtures, +) +def test_partial_evaluation_overviews_returns_all_evaluation_overviews( + repository_fixture: str, + request: FixtureRequest, + partial_evaluation_overviews: Iterable[PartialEvaluationOverview], +) -> None: + evaluation_repository: AsyncEvaluationRepository = request.getfixturevalue( + repository_fixture + ) + for evaluation_overview in partial_evaluation_overviews: + evaluation_repository.store_partial_evaluation_overview(evaluation_overview) + + stored_evaluation_overviews = list( + evaluation_repository.partial_evaluation_overviews() + ) + + assert stored_evaluation_overviews == sorted( + partial_evaluation_overviews, key=lambda eval_overview: eval_overview.id + ) + + +@mark.parametrize( + "repository_fixture", + test_repository_fixtures, +) +def test_partial_and_full_evaluation_overview_dont_overlap( + repository_fixture: str, + request: FixtureRequest, + partial_evaluation_overview: PartialEvaluationOverview, + evaluation_overview: EvaluationOverview, +) -> None: + evaluation_repository: AsyncEvaluationRepository = request.getfixturevalue( + repository_fixture + ) + + evaluation_repository.store_partial_evaluation_overview(partial_evaluation_overview) + evaluation_repository.store_evaluation_overview(evaluation_overview) + + retrieved_partial_evaluation_overview = ( + evaluation_repository.partial_evaluation_overview( + partial_evaluation_overview.id + ) + ) + retrieved_evaluation_overview = evaluation_repository.evaluation_overview( + partial_evaluation_overview.id + ) + + all_partial_overviews = list(evaluation_repository.partial_evaluation_overviews()) + all_full_overviews = list(evaluation_repository.evaluation_overviews()) + + assert retrieved_partial_evaluation_overview == partial_evaluation_overview + assert retrieved_evaluation_overview == evaluation_overview + + assert len(all_partial_overviews) == 1 + assert len(all_full_overviews) == 1 + + assert all_partial_overviews[0] == partial_evaluation_overview + assert all_full_overviews[0] == evaluation_overview + + +@mark.parametrize( + "repository_fixture", + test_repository_fixtures, +) +def test_can_retrieve_failed_evaluations_for_partial_evaluations( + repository_fixture: str, + request: FixtureRequest, + partial_evaluation_overview: PartialEvaluationOverview, +) -> None: + evaluation_repository: AsyncEvaluationRepository = request.getfixturevalue( + repository_fixture + ) + some_dummy_type = PartialEvaluationOverview + + evaluation_repository.store_partial_evaluation_overview(partial_evaluation_overview) + n_failed = len( + evaluation_repository.failed_example_evaluations( + partial_evaluation_overview.id, some_dummy_type + ) + ) + + assert n_failed == 0 diff --git a/tests/evaluation/test_dataset_repository.py b/tests/evaluation/test_dataset_repository.py index 2b94b42ac..6f9ca38f3 100644 --- a/tests/evaluation/test_dataset_repository.py +++ b/tests/evaluation/test_dataset_repository.py @@ -243,8 +243,8 @@ def test_examples_returns_all_examples_sorted_by_their_id( dataset_repository: DatasetRepository = request.getfixturevalue(repository_fixture) examples = [ Example( - input=DummyStringInput.any(), - expected_output=DummyStringOutput.any(), + input=DummyStringInput(), + expected_output=DummyStringOutput(), ) for i in range(0, 10) ] diff --git a/tests/evaluation/test_diff_evaluator.py b/tests/evaluation/test_diff_evaluator.py index 60519c808..6a96ad004 100644 --- a/tests/evaluation/test_diff_evaluator.py +++ b/tests/evaluation/test_diff_evaluator.py @@ -1,23 +1,16 @@ from pydantic import BaseModel -from intelligence_layer.core.task import Task -from intelligence_layer.core.tracer.tracer import Tracer -from intelligence_layer.evaluation.dataset.domain import Example -from intelligence_layer.evaluation.dataset.in_memory_dataset_repository import ( - InMemoryDatasetRepository, -) -from intelligence_layer.evaluation.evaluation.evaluator import ( +from intelligence_layer.core import Task, Tracer +from intelligence_layer.evaluation import ( + Example, IncrementalEvaluationLogic, IncrementalEvaluator, -) -from intelligence_layer.evaluation.evaluation.in_memory_evaluation_repository import ( + InMemoryDatasetRepository, InMemoryEvaluationRepository, -) -from intelligence_layer.evaluation.run.domain import SuccessfulExampleOutput -from intelligence_layer.evaluation.run.in_memory_run_repository import ( InMemoryRunRepository, + Runner, + SuccessfulExampleOutput, ) -from intelligence_layer.evaluation.run.runner import Runner class DummyEvaluation(BaseModel): diff --git a/tests/evaluation/test_evaluation_repository.py b/tests/evaluation/test_evaluation_repository.py index 4fabb8ce3..90750ebbc 100644 --- a/tests/evaluation/test_evaluation_repository.py +++ b/tests/evaluation/test_evaluation_repository.py @@ -86,7 +86,10 @@ def evaluation_overviews(run_overview: RunOverview) -> Iterable[EvaluationOvervi evaluation_overviews.append( EvaluationOverview( id=evaluation_id, - start=utc_now(), + start_date=utc_now(), + end_date=utc_now(), + successful_evaluation_count=1, + failed_evaluation_count=1, run_overviews=frozenset([run_overview]), description="test evaluation overview 1", ) @@ -364,6 +367,38 @@ def test_store_evaluation_overview_stores_and_returns_given_evaluation_overview( assert retrieved_evaluation_overview == evaluation_overview +@mark.parametrize( + "repository_fixture", + test_repository_fixtures, +) +def test_can_retieve_examples_and_failed_examples_after_storing_an_overview( + repository_fixture: str, + request: FixtureRequest, + evaluation_overview: EvaluationOverview, +) -> None: + some_dummy_type = EvaluationOverview + + evaluation_repository: EvaluationRepository = request.getfixturevalue( + repository_fixture + ) + + evaluation_repository.store_evaluation_overview(evaluation_overview) + + n_failed_examples = len( + evaluation_repository.failed_example_evaluations( + evaluation_overview.id, some_dummy_type + ) + ) + assert n_failed_examples == 0 + + n_examples = len( + evaluation_repository.example_evaluations( + evaluation_overview.id, some_dummy_type + ) + ) + assert n_examples == 0 + + @mark.parametrize( "repository_fixture", test_repository_fixtures, diff --git a/tests/evaluation/test_evaluator.py b/tests/evaluation/test_evaluator.py index c0e0d0274..e6a5d1c33 100644 --- a/tests/evaluation/test_evaluator.py +++ b/tests/evaluation/test_evaluator.py @@ -232,21 +232,28 @@ def comparing_aggregator( ) -def test_eval_and_aggregate_runs_returns_generic_statistics( +def test_eval_runs_returns_generic_statistics( dummy_evaluator: Evaluator[str, str, None, DummyEvaluation], - dummy_aggregator: Aggregator[ - DummyEvaluation, DummyAggregatedEvaluationWithResultList - ], dummy_runner: Runner[str, str], - dataset_id: str, + in_memory_dataset_repository: InMemoryDatasetRepository, ) -> None: + examples = [ + Example(input="success", expected_output=None, id="example-1"), + Example(input="success", expected_output=None, id="example-2"), + Example(input="success", expected_output=None, id="example-3"), + Example(input=FAIL_IN_TASK_INPUT, expected_output=None, id="example-4"), + Example(input=FAIL_IN_TASK_INPUT, expected_output=None, id="example-5"), + Example(input=FAIL_IN_EVAL_INPUT, expected_output=None, id="example-6"), + ] + dataset_id = in_memory_dataset_repository.create_dataset( + examples=examples, dataset_name="test-dataset" + ).id + run_overview = dummy_runner.run_dataset(dataset_id) evaluation_overview = dummy_evaluator.evaluate_runs(run_overview.id) - aggregation_overview = dummy_aggregator.aggregate_evaluation(evaluation_overview.id) - assert next(iter(aggregation_overview.run_overviews())).dataset_id == dataset_id - assert aggregation_overview.successful_evaluation_count == 1 - assert aggregation_overview.failed_evaluation_count == 2 + assert evaluation_overview.successful_evaluation_count == 3 + assert evaluation_overview.failed_evaluation_count == 1 def test_evaluator_aborts_on_error( diff --git a/tests/evaluation/test_instruct_comparison_argilla_evaluator.py b/tests/evaluation/test_instruct_comparison_argilla_evaluator.py index 15648a200..dd9e1368e 100644 --- a/tests/evaluation/test_instruct_comparison_argilla_evaluator.py +++ b/tests/evaluation/test_instruct_comparison_argilla_evaluator.py @@ -17,24 +17,21 @@ ) from intelligence_layer.core import CompleteOutput, InstructInput, utc_now from intelligence_layer.evaluation import ( - AggregatedInstructComparison, - ArgillaAggregator, - ArgillaEvaluationRepository, + Aggregator, ArgillaEvaluator, + AsyncInMemoryEvaluationRepository, + ComparisonAggregationLogic, + ComparisonEvaluation, EloCalculator, Example, ExampleOutput, InMemoryAggregationRepository, InMemoryDatasetRepository, - InMemoryEvaluationRepository, InMemoryRunRepository, - InstructComparisonArgillaAggregationLogic, + InstructComparisonArgillaEvaluationLogic, MatchOutcome, RunOverview, ) -from intelligence_layer.evaluation.evaluation.argilla_evaluator import ( - create_instruct_comparison_argilla_evaluation_classes, -) class ArgillaFake(ArgillaClient): @@ -80,53 +77,23 @@ def argilla_fake() -> ArgillaClient: return ArgillaFake() -@fixture -def argilla_repository( - in_memory_evaluation_repository: InMemoryEvaluationRepository, - argilla_fake: ArgillaClient, -) -> ArgillaEvaluationRepository: - ( - evaluation_logic, - evaluation_repository, - ) = create_instruct_comparison_argilla_evaluation_classes( - "workspace", in_memory_evaluation_repository, argilla_fake, None - ) - return evaluation_repository - - @fixture def evaluator( in_memory_dataset_repository: InMemoryDatasetRepository, in_memory_run_repository: InMemoryRunRepository, - in_memory_evaluation_repository: InMemoryEvaluationRepository, + async_in_memory_evaluation_repository: AsyncInMemoryEvaluationRepository, argilla_fake: ArgillaClient, -) -> ArgillaEvaluator[InstructInput, CompleteOutput, None]: - ( - evaluation_logic, - evaluation_repository, - ) = create_instruct_comparison_argilla_evaluation_classes( - "workspace", in_memory_evaluation_repository, argilla_fake, None - ) - return ArgillaEvaluator( - in_memory_dataset_repository, - in_memory_run_repository, - evaluation_repository, - "instruct-evaluator", - evaluation_logic, - ) +) -> ArgillaEvaluator[InstructInput, CompleteOutput, None, ComparisonEvaluation]: + evaluation_logic = InstructComparisonArgillaEvaluationLogic() - -@fixture -def aggregator( - argilla_repository: ArgillaEvaluationRepository, - in_memory_aggregation_repository: InMemoryAggregationRepository, - argilla_aggregation_logic: InstructComparisonArgillaAggregationLogic, -) -> ArgillaAggregator[AggregatedInstructComparison]: - return ArgillaAggregator( - argilla_repository, - in_memory_aggregation_repository, - "instruct-evaluator", - argilla_aggregation_logic, + return ArgillaEvaluator( + dataset_repository=in_memory_dataset_repository, + run_repository=in_memory_run_repository, + evaluation_repository=async_in_memory_evaluation_repository, + description="instruct-evaluator", + workspace_id="workspace", + argilla_client=argilla_fake, + evaluation_logic=evaluation_logic, ) @@ -143,6 +110,11 @@ def any_instruct_output() -> CompleteOutput: ) +@fixture +def argilla_aggregation_logic() -> ComparisonAggregationLogic: + return ComparisonAggregationLogic() + + def create_dummy_dataset( in_memory_dataset_repository: InMemoryDatasetRepository, ) -> str: @@ -188,13 +160,24 @@ def create_dummy_runs( def test_evaluate_run_submits_pairwise_comparison_records( - evaluator: ArgillaEvaluator[InstructInput, CompleteOutput, None], - aggregator: ArgillaAggregator[AggregatedInstructComparison], + evaluator: ArgillaEvaluator[ + InstructInput, CompleteOutput, None, ComparisonEvaluation + ], in_memory_dataset_repository: InMemoryDatasetRepository, in_memory_run_repository: InMemoryRunRepository, + async_in_memory_evaluation_repository: AsyncInMemoryEvaluationRepository, + in_memory_aggregation_repository: InMemoryAggregationRepository, + argilla_aggregation_logic: ComparisonAggregationLogic, any_instruct_output: CompleteOutput, argilla_fake: ArgillaFake, ) -> None: + aggregator = Aggregator( + async_in_memory_evaluation_repository, + in_memory_aggregation_repository, + "instruct-evaluator", + argilla_aggregation_logic, + ) + run_count = 10 run_ids = [f"{i}" for i in range(run_count)] dataset_id = create_dummy_dataset(in_memory_dataset_repository) @@ -202,15 +185,16 @@ def test_evaluate_run_submits_pairwise_comparison_records( in_memory_run_repository, any_instruct_output, run_ids, dataset_id ) - evaluation_overview = evaluator.evaluate_runs(*run_ids) + partial_overview = evaluator.submit(*run_ids) pairs = combinations(run_ids, 2) assert sorted( tuple(sorted((record_data.metadata["first"], record_data.metadata["second"]))) - for record_data in argilla_fake.record_data(evaluation_overview.id) + for record_data in argilla_fake.record_data(partial_overview.id) ) == sorted(pairs) + eval_overview = evaluator.retrieve(partial_overview.id) - elo_score = aggregator.aggregate_evaluation(evaluation_overview.id) + elo_score = aggregator.aggregate_evaluation(eval_overview.id) scores = elo_score.statistics.scores # lower id always wins, should be sorted for i in range(run_count - 1): @@ -221,20 +205,21 @@ def test_evaluate_run_submits_pairwise_comparison_records( def test_evaluate_run_only_evaluates_high_priority( in_memory_dataset_repository: InMemoryDatasetRepository, in_memory_run_repository: InMemoryRunRepository, - in_memory_evaluation_repository: InMemoryEvaluationRepository, + async_in_memory_evaluation_repository: AsyncInMemoryEvaluationRepository, any_instruct_output: CompleteOutput, argilla_fake: ArgillaFake, ) -> None: relevant_ids = frozenset({"1", "2"}) - eval_logic, eval_repository = create_instruct_comparison_argilla_evaluation_classes( - "workspace", in_memory_evaluation_repository, argilla_fake, relevant_ids - ) + evaluation_logic = InstructComparisonArgillaEvaluationLogic(relevant_ids) + evaluator = ArgillaEvaluator( - in_memory_dataset_repository, - in_memory_run_repository, - eval_repository, - "instruct-evaluator", - eval_logic, + dataset_repository=in_memory_dataset_repository, + run_repository=in_memory_run_repository, + evaluation_repository=async_in_memory_evaluation_repository, + description="instruct-evaluator", + workspace_id="workspace", + argilla_client=argilla_fake, + evaluation_logic=evaluation_logic, ) run_count = 10 @@ -245,7 +230,8 @@ def test_evaluate_run_only_evaluates_high_priority( in_memory_run_repository, any_instruct_output, run_ids, dataset_id ) - evaluation_overview = evaluator.evaluate_runs(*run_ids) + partial_overview = evaluator.submit(*run_ids) + evaluation_overview = evaluator.retrieve(partial_overview.id) def relevant_ids_in_record(record: RecordData) -> bool: players = [record.metadata["first"], record.metadata["second"]]