diff --git a/CHANGELOG.md b/CHANGELOG.md index ea2b6e5a..c513248c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Introduce `BenchmarkRepository`and `StudioBenchmarkRepository` - Add `create_project` bool to `StudioClient.__init__()` to enable users to automatically create their Studio projects - Add progressbar to the `Runner` to be able to track the `Run` +- Add `StudioClient.submit_benchmark_lineages` function and include it in `StudioClient.submit_benchmark_execution` ### Fixes ... diff --git a/src/intelligence_layer/connectors/studio/studio.py b/src/intelligence_layer/connectors/studio/studio.py index a67f7d1b..d5f88671 100644 --- a/src/intelligence_layer/connectors/studio/studio.py +++ b/src/intelligence_layer/connectors/studio/studio.py @@ -1,3 +1,4 @@ +import gzip import json import os from collections import defaultdict, deque @@ -8,7 +9,7 @@ from uuid import uuid4 import requests -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, RootModel from requests.exceptions import ConnectionError, MissingSchema from intelligence_layer.connectors import JsonSerializable @@ -24,6 +25,8 @@ Input = TypeVar("Input", bound=PydanticSerializable) ExpectedOutput = TypeVar("ExpectedOutput", bound=PydanticSerializable) +Output = TypeVar("Output", bound=PydanticSerializable) +Evaluation = TypeVar("Evaluation", bound=BaseModel, covariant=True) class StudioProject(BaseModel): @@ -140,6 +143,38 @@ class GetDatasetExamplesResponse(BaseModel, Generic[Input, ExpectedOutput]): items: Sequence[StudioExample[Input, ExpectedOutput]] +class BenchmarkLineage(BaseModel, Generic[Input, Output, ExpectedOutput, Evaluation]): + trace_id: str + input: Input + expected_output: ExpectedOutput + output: Output + example_metadata: Optional[dict[str, Any]] = None + evaluation: Any + run_latency: int + run_tokens: int + + +class PostBenchmarkLineagesRequest(RootModel[Sequence[BenchmarkLineage]]): + pass + + +class PostBenchmarkLineagesResponse(RootModel[Sequence[str]]): + pass + + +class GetBenchmarkLineageResponse(BaseModel): + id: str + trace_id: str + benchmark_execution_id: str + input: Any + expected_output: Any + example_metadata: Optional[dict[str, Any]] = None + output: Any + evaluation: Any + run_latency: int + run_tokens: int + + class StudioClient: """Client for communicating with Studio. @@ -403,7 +438,7 @@ def get_dataset_examples( if page is None: break - def create_benchmark( + def submit_benchmark( self, dataset_id: str, eval_logic: EvaluationLogicIdentifier, @@ -449,7 +484,7 @@ def get_benchmark( return None return GetBenchmarkResponse.model_validate(response_text) - def create_benchmark_execution( + def submit_benchmark_execution( self, benchmark_id: str, data: PostBenchmarkExecution ) -> str: url = urljoin( @@ -464,6 +499,98 @@ def create_benchmark_execution( self._raise_for_status(response) return str(response.json()) + def submit_benchmark_lineages( + self, + benchmark_lineages: Sequence[BenchmarkLineage], + benchmark_id: str, + execution_id: str, + max_payload_size: int = 50 + * 1024 + * 1024, # Maximum request size handled by Studio + ) -> PostBenchmarkLineagesResponse: + """Submit benchmark lineages in batches to avoid exceeding the maximum payload size. + + Args: + benchmark_lineages: List of :class: `BenchmarkLineages` to submit. + benchmark_id: ID of the benchmark. + execution_id: ID of the execution. + max_payload_size: Maximum size of the payload in bytes. Defaults to 50MB. + + Returns: + Response containing the results of the submissions. + """ + all_responses = [] + remaining_lineages = list(benchmark_lineages) + lineage_sizes = [ + len(lineage.model_dump_json().encode("utf-8")) + for lineage in benchmark_lineages + ] + + while remaining_lineages: + batch = [] + current_size = 0 + # Build batch while checking size + for lineage, size in zip(remaining_lineages, lineage_sizes, strict=True): + if current_size + size <= max_payload_size: + batch.append(lineage) + current_size += size + else: + break + + if batch: + # Send batch + response = self._send_compressed_batch( + batch, benchmark_id, execution_id + ) + all_responses.extend(response) + + else: # Only reached if a lineage is too big for the request + print("Lineage exceeds maximum of upload size", lineage) + batch.append(lineage) + remaining_lineages = remaining_lineages[len(batch) :] + lineage_sizes = lineage_sizes[len(batch) :] + + return PostBenchmarkLineagesResponse(all_responses) + + def get_benchmark_lineage( + self, benchmark_id: str, execution_id: str, lineage_id: str + ) -> GetBenchmarkLineageResponse | None: + url = urljoin( + self.url, + f"/api/projects/{self.project_id}/evaluation/benchmarks/{benchmark_id}/executions/{execution_id}/lineages/{lineage_id}", + ) + response = requests.get( + url, + headers=self._headers, + ) + self._raise_for_status(response) + response_text = response.json() + if response_text is None: + return None + return GetBenchmarkLineageResponse.model_validate(response_text) + + def _send_compressed_batch( + self, batch: list[BenchmarkLineage], benchmark_id: str, execution_id: str + ) -> list[str]: + url = urljoin( + self.url, + f"/api/projects/{self.project_id}/evaluation/benchmarks/{benchmark_id}/executions/{execution_id}/lineages", + ) + + json_data = PostBenchmarkLineagesRequest(root=batch).model_dump_json() + compressed_data = gzip.compress(json_data.encode("utf-8")) + + headers = {**self._headers, "Content-Encoding": "gzip"} + + response = requests.post( + url, + headers=headers, + data=compressed_data, + ) + + self._raise_for_status(response) + return response.json() + def _raise_for_status(self, response: requests.Response) -> None: try: response.raise_for_status() diff --git a/src/intelligence_layer/evaluation/benchmark/studio_benchmark.py b/src/intelligence_layer/evaluation/benchmark/studio_benchmark.py index 39ecfcd6..988e1af9 100644 --- a/src/intelligence_layer/evaluation/benchmark/studio_benchmark.py +++ b/src/intelligence_layer/evaluation/benchmark/studio_benchmark.py @@ -1,4 +1,5 @@ import inspect +from collections.abc import Sequence from datetime import datetime from http import HTTPStatus from typing import Any, Optional @@ -9,6 +10,7 @@ from intelligence_layer.connectors.studio.studio import ( AggregationLogicIdentifier, + BenchmarkLineage, EvaluationLogicIdentifier, PostBenchmarkExecution, StudioClient, @@ -39,6 +41,9 @@ from intelligence_layer.evaluation.evaluation.in_memory_evaluation_repository import ( InMemoryEvaluationRepository, ) +from intelligence_layer.evaluation.infrastructure.repository_navigator import ( + EvaluationLineage, +) from intelligence_layer.evaluation.run.in_memory_run_repository import ( InMemoryRunRepository, ) @@ -135,17 +140,60 @@ def execute( statistics=aggregation_overview.statistics.model_dump_json(), ) - benchmark_execution_id = self.client.create_benchmark_execution( + benchmark_execution_id = self.client.submit_benchmark_execution( benchmark_id=self.id, data=data ) - evaluation_lineages = self.evaluator.evaluation_lineages(evaluation_overview.id) + evaluation_lineages = list( + self.evaluator.evaluation_lineages(evaluation_overview.id) + ) + trace_ids = [] for lineage in tqdm(evaluation_lineages, desc="Submitting traces to Studio"): trace = lineage.tracers[0] assert trace - self.client.submit_trace(trace.export_for_viewing()) + trace_id = self.client.submit_trace(trace.export_for_viewing()) + trace_ids.append(trace_id) + + benchmark_lineages = self._create_benchmark_lineages( + eval_lineages=evaluation_lineages, + trace_ids=trace_ids, + ) + self.client.submit_benchmark_lineages( + benchmark_lineages=benchmark_lineages, + execution_id=benchmark_execution_id, + benchmark_id=self.id, + ) + return benchmark_execution_id + def _create_benchmark_lineages( + self, + eval_lineages: list[ + EvaluationLineage[Input, ExpectedOutput, Output, Evaluation] + ], + trace_ids: list[str], + ) -> Sequence[BenchmarkLineage[Input, Output, ExpectedOutput, Evaluation]]: + return [ + self._create_benchmark_lineage(eval_lineage, trace_id) + for eval_lineage, trace_id in zip(eval_lineages, trace_ids, strict=True) + ] + + def _create_benchmark_lineage( + self, + eval_lineage: EvaluationLineage[Input, ExpectedOutput, Output, Evaluation], + trace_id: str, + ) -> BenchmarkLineage: + return BenchmarkLineage( + trace_id=trace_id, + input=eval_lineage.example.input, + expected_output=eval_lineage.example.expected_output, + example_metadata=eval_lineage.example.metadata, + output=eval_lineage.outputs[0].output, + evaluation=eval_lineage.evaluation.result, + run_latency=0, # TODO: Implement this + run_tokens=0, # TODO: Implement this + ) + class StudioBenchmarkRepository(BenchmarkRepository): def __init__(self, studio_client: StudioClient): @@ -161,7 +209,7 @@ def create_benchmark( description: Optional[str] = None, ) -> StudioBenchmark: try: - benchmark_id = self.client.create_benchmark( + benchmark_id = self.client.submit_benchmark( dataset_id, create_evaluation_logic_identifier(eval_logic), create_aggregation_logic_identifier(aggregation_logic), diff --git a/tests/connectors/studio/test_studio_benchmark.py b/tests/connectors/studio/test_studio_benchmark.py index 4cce95fb..db7881df 100644 --- a/tests/connectors/studio/test_studio_benchmark.py +++ b/tests/connectors/studio/test_studio_benchmark.py @@ -10,12 +10,16 @@ from intelligence_layer.connectors.studio.studio import ( AggregationLogicIdentifier, + BenchmarkLineage, EvaluationLogicIdentifier, PostBenchmarkExecution, + PostBenchmarkLineagesRequest, StudioClient, StudioDataset, StudioExample, ) +from intelligence_layer.core.tracer.in_memory_tracer import InMemoryTracer +from intelligence_layer.core.tracer.tracer import ExportedSpan from intelligence_layer.evaluation.aggregation.aggregator import AggregationLogic from intelligence_layer.evaluation.benchmark.studio_benchmark import ( create_aggregation_logic_identifier, @@ -25,6 +29,11 @@ from intelligence_layer.evaluation.evaluation.evaluator.evaluator import ( SingleOutputEvaluationLogic, ) +from tests.connectors.studio.test_studio import TracerTestTask + + +class DummyExample(Example[str, str]): + data: str class DummyEvaluation(BaseModel): @@ -60,6 +69,14 @@ def aggregate(self, evaluations: Iterable[DummyEvaluation]) -> DummyAggregation: return DummyAggregation(num_evaluations=len(list(evaluations))) +class DummyBenchmarkLineage(BenchmarkLineage[str, str, str, DummyEvaluation]): + pass + + +class DummyPostBenchmarkLineagesRequest(PostBenchmarkLineagesRequest): + pass + + @fixture def studio_dataset( studio_client: StudioClient, examples: Sequence[StudioExample[str, str]] @@ -67,6 +84,31 @@ def studio_dataset( return studio_client.submit_dataset(StudioDataset(name="dataset_name"), examples) +@fixture +def post_benchmark_execution() -> PostBenchmarkExecution: + return PostBenchmarkExecution( + name="name", + description="Test benchmark execution", + labels={"performance", "testing"}, + metadata={"project": "AI Testing", "team": "QA"}, + start=datetime.now(), + end=datetime.now(), + run_start=datetime.now(), + run_end=datetime.now(), + run_successful_count=10, + run_failed_count=2, + run_success_avg_latency=120, + run_success_avg_token_count=300, + eval_start=datetime.now(), + eval_end=datetime.now(), + eval_successful_count=8, + eval_failed_count=1, + aggregation_start=datetime.now(), + aggregation_end=datetime.now(), + statistics=DummyAggregatedEvaluation(score=1.0).model_dump_json(), + ) + + @fixture def evaluation_logic_identifier() -> EvaluationLogicIdentifier: return create_evaluation_logic_identifier(DummyEvaluationLogic()) @@ -77,13 +119,75 @@ def aggregation_logic_identifier() -> AggregationLogicIdentifier: return create_aggregation_logic_identifier(DummyAggregationLogic()) +@fixture +def test_trace() -> Sequence[ExportedSpan]: + tracer = InMemoryTracer() + task = TracerTestTask() + task.run("my input", tracer) + return tracer.export_for_viewing() + + +@fixture +def with_uploaded_test_trace( + test_trace: Sequence[ExportedSpan], studio_client: StudioClient +) -> str: + trace_id = studio_client.submit_trace(test_trace) + return trace_id + + +@fixture +def with_uploaded_benchmark( + studio_client: StudioClient, + studio_dataset: str, + evaluation_logic_identifier: EvaluationLogicIdentifier, + aggregation_logic_identifier: AggregationLogicIdentifier, +) -> str: + benchmark_id = studio_client.submit_benchmark( + studio_dataset, + evaluation_logic_identifier, + aggregation_logic_identifier, + "benchmark_name", + ) + return benchmark_id + + +@fixture +def with_uploaded_benchmark_execution( + studio_client: StudioClient, + studio_dataset: str, + evaluation_logic_identifier: EvaluationLogicIdentifier, + aggregation_logic_identifier: AggregationLogicIdentifier, + with_uploaded_benchmark: str, + post_benchmark_execution: PostBenchmarkExecution, +) -> str: + benchmark_execution_id = studio_client.submit_benchmark_execution( + benchmark_id=with_uploaded_benchmark, data=post_benchmark_execution + ) + return benchmark_execution_id + + +def dummy_lineage( + trace_id: str, input: str = "input", output: str = "output" +) -> DummyBenchmarkLineage: + return DummyBenchmarkLineage( + trace_id=trace_id, + input=input, + expected_output="output", + example_metadata={"key3": "value3"}, + output=output, + evaluation={"key5": "value5"}, + run_latency=1, + run_tokens=3, + ) + + def test_create_benchmark( studio_client: StudioClient, studio_dataset: str, evaluation_logic_identifier: EvaluationLogicIdentifier, aggregation_logic_identifier: AggregationLogicIdentifier, ) -> None: - benchmark_id = studio_client.create_benchmark( + benchmark_id = studio_client.submit_benchmark( studio_dataset, evaluation_logic_identifier, aggregation_logic_identifier, @@ -99,7 +203,7 @@ def test_create_benchmark_with_non_existing_dataset( aggregation_logic_identifier: AggregationLogicIdentifier, ) -> None: with pytest.raises(HTTPError, match=str(HTTPStatus.BAD_REQUEST.value)): - studio_client.create_benchmark( + studio_client.submit_benchmark( "fake_id", evaluation_logic_identifier, aggregation_logic_identifier, @@ -110,18 +214,12 @@ def test_create_benchmark_with_non_existing_dataset( def test_get_benchmark( studio_client: StudioClient, studio_dataset: str, - evaluation_logic_identifier: EvaluationLogicIdentifier, - aggregation_logic_identifier: AggregationLogicIdentifier, + with_uploaded_benchmark: str, ) -> None: dummy_evaluation_logic = """return DummyEvaluation(result="success")""" benchmark_name = "benchmark_name" - benchmark_id = studio_client.create_benchmark( - studio_dataset, - evaluation_logic_identifier, - aggregation_logic_identifier, - benchmark_name, - ) + benchmark_id = with_uploaded_benchmark benchmark = studio_client.get_benchmark(benchmark_id) assert benchmark @@ -138,41 +236,83 @@ def test_get_non_existing_benchmark(studio_client: StudioClient) -> None: def test_can_create_benchmark_execution( studio_client: StudioClient, - studio_dataset: str, - evaluation_logic_identifier: EvaluationLogicIdentifier, - aggregation_logic_identifier: AggregationLogicIdentifier, + with_uploaded_benchmark: str, + post_benchmark_execution: PostBenchmarkExecution, ) -> None: - benchmark_id = studio_client.create_benchmark( - studio_dataset, - evaluation_logic_identifier, - aggregation_logic_identifier, - "benchmark_name", - ) + benchmark_id = with_uploaded_benchmark - example_request = PostBenchmarkExecution( - name="name", - description="Test benchmark execution", - labels={"performance", "testing"}, - metadata={"project": "AI Testing", "team": "QA"}, - start=datetime.now(), - end=datetime.now(), - run_start=datetime.now(), - run_end=datetime.now(), - run_successful_count=10, - run_failed_count=2, - run_success_avg_latency=120, - run_success_avg_token_count=300, - eval_start=datetime.now(), - eval_end=datetime.now(), - eval_successful_count=8, - eval_failed_count=1, - aggregation_start=datetime.now(), - aggregation_end=datetime.now(), - statistics=DummyAggregatedEvaluation(score=1.0).model_dump_json(), - ) + example_request = post_benchmark_execution - benchmark_execution_id = studio_client.create_benchmark_execution( + benchmark_execution_id = studio_client.submit_benchmark_execution( benchmark_id=benchmark_id, data=example_request ) assert UUID(benchmark_execution_id) + + +def test_can_submit_lineages( + studio_client: StudioClient, + with_uploaded_test_trace: str, + with_uploaded_benchmark: str, + with_uploaded_benchmark_execution: str, + post_benchmark_execution: PostBenchmarkExecution, +) -> None: + trace_id = with_uploaded_test_trace + benchmark_id = with_uploaded_benchmark + benchmark_execution_id = with_uploaded_benchmark_execution + + lineages = [ + dummy_lineage( + trace_id, + ), + dummy_lineage(trace_id, "slightly longer input", "slightly_longer_output"), + ] + + lineage_ids = studio_client.submit_benchmark_lineages( + benchmark_lineages=lineages, + benchmark_id=benchmark_id, + execution_id=benchmark_execution_id, + max_payload_size=len(lineages[1].model_dump_json().encode("utf-8")) + + 1, # to enforce making to requests for the lineages + ) + + assert len(lineage_ids.root) == len(lineages) + for lineage_id in lineage_ids.root: + assert UUID(lineage_id) + + +def test_submit_lineage_skips_lineages_exceeding_request_size( + studio_client: StudioClient, + with_uploaded_test_trace: str, + with_uploaded_benchmark: str, + with_uploaded_benchmark_execution: str, +) -> None: + trace_id = with_uploaded_test_trace + benchmark_id = with_uploaded_benchmark + benchmark_execution_id = with_uploaded_benchmark_execution + + lineages = [ + dummy_lineage(trace_id), + dummy_lineage( + trace_id, + input="input input2 input3 input4 input5", + output="output output output output", + ), + ] + + lineage_ids = studio_client.submit_benchmark_lineages( + benchmark_lineages=lineages, + benchmark_id=benchmark_id, + execution_id=benchmark_execution_id, + max_payload_size=len(lineages[0].model_dump_json().encode("utf-8")) + + 1, # to enforce second lineage exceeds + ) + + fetched_lineage = studio_client.get_benchmark_lineage( + benchmark_id=benchmark_id, + execution_id=benchmark_execution_id, + lineage_id=lineage_ids.root[0], + ) + assert len(lineage_ids.root) == 1 + assert fetched_lineage + assert fetched_lineage.input == lineages[0].input diff --git a/tests/evaluation/benchmark/test_benchmark.py b/tests/evaluation/benchmark/test_benchmark.py index 323dd581..da3cb932 100644 --- a/tests/evaluation/benchmark/test_benchmark.py +++ b/tests/evaluation/benchmark/test_benchmark.py @@ -140,7 +140,7 @@ def test_create_benchmark( aggregation_logic: DummyAggregationLogic, ) -> None: dataset_id = "fake_dataset_id" - mock_studio_client.create_benchmark.return_value = str(uuid4()) # type: ignore + mock_studio_client.submit_benchmark.return_value = str(uuid4()) # type: ignore benchmark = studio_benchmark_repository.create_benchmark( dataset_id, evaluation_logic, aggregation_logic, "benchmark_name" @@ -148,7 +148,7 @@ def test_create_benchmark( uuid = UUID(benchmark.id) assert uuid assert benchmark.dataset_id == dataset_id - studio_benchmark_repository.client.create_benchmark.assert_called_once() # type: ignore + studio_benchmark_repository.client.submit_benchmark.assert_called_once() # type: ignore def test_create_benchmark_with_non_existing_dataset( @@ -161,7 +161,7 @@ def test_create_benchmark_with_non_existing_dataset( response = Response() response.status_code = 400 - mock_studio_client.create_benchmark.side_effect = HTTPError( # type: ignore + mock_studio_client.submit_benchmark.side_effect = HTTPError( # type: ignore "400 Client Error: Bad Request for url", response=response ) @@ -218,6 +218,7 @@ def test_execute_benchmark( task, ) -> None: mock_studio_client.get_benchmark.return_value = get_benchmark_response # type: ignore + mock_studio_client.submit_trace.return_value = str(uuid4()) # type: ignore examples = [ StudioExample(input="input0", expected_output="expected_output0"), StudioExample(input="input1", expected_output="expected_output1"), @@ -239,5 +240,6 @@ def test_execute_benchmark( labels={"label"}, ) - mock_studio_client.create_benchmark_execution.assert_called_once() # type: ignore + mock_studio_client.submit_benchmark_execution.assert_called_once() # type: ignore assert mock_studio_client.submit_trace.call_count == 4 # type: ignore + mock_studio_client.submit_benchmark_lineages.assert_called_once() # type: ignore